http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java
index e3aa2f0..98ccfcf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 public abstract class InternalExecutionVertexProfilingData implements 
InternalProfilingData {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java
index 8819fc4..92ce916 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 public class InternalExecutionVertexThreadProfilingData extends 
InternalExecutionVertexProfilingData {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java
index 5449869..7de9252 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * Instance profiling events are a special subclass of profiling events. They 
contain profiling information about the

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java
index ec3ab30..4bb052d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.profiling.types;
 
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 /**
  * Instance summary profiling events summarize the profiling events of all 
instances involved in computing a job.

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java
index a4d0634..13a9a99 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.event.job.AbstractEvent;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 import com.google.common.base.Preconditions;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java
index 4e0faf7..e07f144 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.types.StringValue;
 
 import com.google.common.base.Preconditions;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java
index d878906..08b932a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java
index 148d16d..fc18758 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java
@@ -24,7 +24,7 @@ import java.io.IOException;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index fa2413b..3d1419a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index bda7c8b..a56a7e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -25,7 +25,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.InstantiationUtil;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index 16ca090..6e446de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -24,7 +24,7 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.messages.JobManagerMessages;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
index e8b730d..039e926 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
@@ -21,35 +21,115 @@ import org.apache.hadoop.fs.Path;
 import java.io.File;
 import java.util.List;
 
+/**
+ * Abstract interface for an implementation of a Flink on YARN client to 
deploy.
+ *
+ * The Client describes the properties of the YARN application to create.
+ */
 public abstract class AbstractFlinkYarnClient {
 
        // ---- Setter for YARN Cluster properties ----- //
+
+       /**
+        * @param memoryMB The amount of memory for the JobManager (in MB)
+        */
        public abstract void setJobManagerMemory(int memoryMB);
+
+       /**
+        * @param memoryMB The memory per TaskManager (in MB)
+        */
        public abstract void setTaskManagerMemory(int memoryMB);
 
+       /**
+        * Flink configuration
+        */
        public abstract void 
setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf);
 
+       /**
+        *
+        * @param slots The number of TaskManager slots per TaskManager.
+        */
        public abstract void setTaskManagerSlots(int slots);
+
+       /**
+        * @return the number of TaskManager processing slots per TaskManager.
+        */
        public abstract int getTaskManagerSlots();
+
+       /**
+        * @param queue Name of the YARN queue
+        */
        public abstract void setQueue(String queue);
+
+       /**
+        *
+        * @param localJarPath Local Path to the Flink uberjar
+        */
        public abstract void setLocalJarPath(Path localJarPath);
+
+       /**
+        *
+        * @param confPath local path to the Flink configuration file
+        */
        public abstract void setConfigurationFilePath(Path confPath);
+
+       /**
+        *
+        * @param logConfPath local path to the flink logging configuration
+        */
        public abstract void setFlinkLoggingConfigurationPath(Path logConfPath);
        public abstract Path getFlinkLoggingConfigurationPath();
+
+       /**
+        *
+        * @param tmCount number of TaskManagers to start
+        */
        public abstract void setTaskManagerCount(int tmCount);
        public abstract int getTaskManagerCount();
+
+       /**
+        * @param confDirPath Path to config directory.
+        */
        public abstract void setConfigurationDirectory(String confDirPath);
-       // List of files to transfer to the YARN containers.
+
+       /**
+        * List of files to transfer to the YARN containers.
+        */
        public abstract void setShipFiles(List<File> shipFiles);
+
+       /**
+        *
+        * @param dynamicPropertiesEncoded Encoded String of the dynamic 
properties (-D configuration values of the Flink configuration)
+        */
        public abstract void setDynamicPropertiesEncoded(String 
dynamicPropertiesEncoded);
        public abstract String getDynamicPropertiesEncoded();
 
-       // ---- Operations on the YARN cluster ----- //
+       // --------------------------------------- Operations on the YARN 
cluster ----- //
+
+       /**
+        * Returns a String containing details about the cluster (NodeManagers, 
available memory, ...)
+        *
+        */
        public abstract String getClusterDescription() throws Exception;
 
+       /**
+        * Trigger the deployment to YARN.
+        *
+        * @param clusterName Name to be shown in the YARN resource manager 
overview.
+        */
        public abstract AbstractFlinkYarnCluster deploy(String clusterName) 
throws Exception;
 
+       /**
+        * @param detachedMode If true, the Flink YARN client is non-blocking. 
That means it returns
+        *                        once Flink has been started successfully on 
YARN.
+        */
        public abstract void setDetachedMode(boolean detachedMode);
 
+       public abstract boolean isDetached();
+
+       /**
+        * @return The string representation of the Path to the YARN session 
files. This is a temporary
+        * directory in HDFS that contains the jar files and configuration 
which is shipped to all the containers.
+        */
        public abstract String getSessionFilesDir();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
index 4b4bd2d..398709e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.yarn;
 
+import org.apache.flink.api.common.JobID;
+
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
 
@@ -43,4 +46,30 @@ public abstract class AbstractFlinkYarnCluster {
        public abstract List<String> getNewMessages();
 
        public abstract String getApplicationId();
+
+       public abstract boolean isDetached();
+
+       /**
+        * Connect the FlinkYarnCluster to the ApplicationMaster.
+        *
+        * Detached YARN sessions don't need to connect to the 
ApplicationMaster.
+        * Detached per job YARN sessions need to connect until the required 
number of TaskManagers have been started.
+        *
+        * @throws IOException
+        */
+       public abstract void connectToCluster() throws IOException;
+
+       /**
+        * Disconnect from the ApplicationMaster without stopping the session
+        * (therefore, use the {@see shutdown()} method.
+        */
+       public abstract void disconnect();
+
+       /**
+        * Tells the ApplicationMaster to monitor the status of JobId and stop 
itself once the specified
+        * job has finished.
+        *
+        * @param jobID
+        */
+       public abstract void stopAfterJob(JobID jobID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index f1c6450..d153dcc 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -88,8 +88,8 @@ ActorLogging {
 
     case Success(_) =>
 
-    case JobResultSuccess(_, duration, accumulatorResults) =>
-      jobSubmitter ! new JobExecutionResult(duration, accumulatorResults)
+    case JobResultSuccess(jobId, duration, accumulatorResults) =>
+      jobSubmitter ! new JobExecutionResult(jobId, duration, 
accumulatorResults)
       self ! PoisonPill
 
     case msg =>

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9aa476d..4b0a55b 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -22,7 +22,7 @@ import java.io.{IOException, File}
 import java.net.InetSocketAddress
 
 import akka.actor.Status.{Success, Failure}
-import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.{JobID, ExecutionConfig}
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, 
Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobID}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
 import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -86,7 +86,7 @@ import scala.collection.JavaConverters._
  * - [[JobStatusChanged]] indicates that the status of job (RUNNING, 
CANCELING, FINISHED, etc.) has
  * changed. This message is sent by the ExecutionGraph.
  */
-class JobManager(val configuration: Configuration,
+class JobManager(val flinkConfiguration: Configuration,
                  val instanceManager: InstanceManager,
                  val scheduler: FlinkScheduler,
                  val libraryCacheManager: BlobLibraryCacheManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 0c3384c..cb7bfec 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.jobmanager
 
 import akka.actor.{ActorLogging, Actor}
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.executiongraph.ExecutionGraph
-import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
index b6b2cc1..d9a3421 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
@@ -21,11 +21,12 @@ package org.apache.flink.runtime.jobmanager
 import java.lang.Long
 
 import akka.actor._
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.execution.ExecutionState.RUNNING
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, 
ExecutionGraph, ExecutionVertex}
 import org.apache.flink.runtime.jobgraph.JobStatus._
-import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID}
+import org.apache.flink.runtime.jobgraph.JobVertexID
 import org.apache.flink.runtime.state.StateHandle
 
 import scala.collection.JavaConversions._

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
index 704bf86..b4ed2cc 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.messages
 
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.executiongraph.ExecutionGraph
-import org.apache.flink.runtime.jobgraph.JobID
 
 /**
  * This object contains the archive specific messages.

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
index d413d13..6785c31 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala
@@ -21,9 +21,10 @@ package org.apache.flink.runtime.messages
 import java.text.SimpleDateFormat
 import java.util.Date
 
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID, JobID}
+import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID}
 
 /**
  * This object contains the execution graph specific messages.

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 17e9138..dab4671 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.runtime.messages
 
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.accumulators.AccumulatorEvent
 import org.apache.flink.runtime.client.JobStatusMessage
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, 
ExecutionGraph}
 import org.apache.flink.runtime.instance.{InstanceID, Instance}
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobID, JobStatus, 
JobVertexID}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
 import org.apache.flink.runtime.taskmanager.TaskExecutionState
 
 import scala.collection.JavaConverters._

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 0c9bf9b..cb79fbc 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -193,6 +193,10 @@ class LocalFlinkMiniCluster(userConfiguration: 
Configuration, singleActorSystem:
     }
   }
 
+  def getConfiguration: Configuration = {
+    this.userConfiguration
+  }
+
   def getDefaultConfig: Configuration = {
     val config: Configuration = new Configuration()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
index 2254d7c..db74bbd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java
@@ -31,7 +31,7 @@ import java.net.InetSocketAddress;
 import java.security.MessageDigest;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index adb3bfc..8613b4e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.junit.Test;
 
 import java.io.File;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 1f8d29b..c98e575 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.blob;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 5742fea..50b154e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -28,7 +28,7 @@ import java.util.List;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.RegularPactTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 239e356..d024135 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.junit.Test;
 
 import static org.junit.Assert.*;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index e0852c6..894a7a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index c979c42..8c4879f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -46,7 +46,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.operators.RegularPactTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index d136d6f..5dd10c0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -39,7 +39,7 @@ import 
org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
index 2d1d9d5..1e78ac5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 168ea90..e199353 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -39,7 +39,7 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.TaskManagerMessages;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index f5089c2..5713c10 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingUtils;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 66ef4ca..e24a2b4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
index ddd7282..f72d105 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 
 
 public class PointwisePatternTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
index 4f82dea..17d78d2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
index df28f27..8623f75 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java
@@ -26,7 +26,7 @@ import org.junit.Assert;
 
 import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.core.fs.local.LocalFileSystem;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 47f7a2b..c0ed629 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -24,7 +24,7 @@ import java.lang.reflect.Method;
 import java.net.InetAddress;
 
 import akka.actor.ActorRef;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index b16bf4b..f6c0c9f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -25,7 +25,7 @@ import java.net.InetAddress;
 
 import akka.actor.ActorRef;
 import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.junit.Test;
 import org.mockito.Matchers;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index cc90c44..32fe1a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Lists;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -34,7 +35,6 @@ import 
org.apache.flink.runtime.io.network.util.TestPartitionProducer;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobID;
 import org.junit.Test;
 
 import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobIdTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobIdTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobIdTest.java
index 0405f50..083a1a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobIdTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobIdTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobgraph;
 
+import org.apache.flink.api.common.JobID;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 006bd27..776184c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
index fcb638f..42a3702 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java
@@ -29,7 +29,7 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 public class SharedSlotsTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
index 23a5c94..d678531 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.junit.Test;
 
 public class SlotAllocationFutureTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index d5c9fbb..7cbb59b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java
index d7bdda3..c3c75ce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java
@@ -23,11 +23,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent;
-import org.apache.flink.runtime.profiling.types.SingleInstanceProfilingEvent;
-import org.apache.flink.runtime.profiling.types.ThreadProfilingEvent;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.ManagementTestUtils;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
index e9e17ca..6939527 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java
@@ -24,7 +24,7 @@ import java.io.IOException;
 
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 8784c14..640ccc3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -41,7 +41,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.Tasks;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 68b0e6f..689b22d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.execution.RuntimeEnvironment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.MockNetworkEnvironment;
-import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
index 775c0cb..a36ded9 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala
@@ -20,9 +20,10 @@ package org.apache.flink.runtime.executiongraph
 
 import akka.actor.{Props, ActorSystem}
 import akka.testkit.TestKit
+import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, 
AbstractJobVertex}
+import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, 
AbstractJobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index 69b0588..1a36112 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -20,11 +20,12 @@ package org.apache.flink.runtime.executiongraph
 
 import akka.actor.{Props, ActorSystem}
 import akka.testkit.TestKit
+import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils
 .SimpleAcknowledgingTaskManager
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, 
AbstractJobVertex}
+import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, 
AbstractJobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index 89cbfe6..89e1d72 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -20,9 +20,10 @@ package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{Cancellable, Terminated, ActorRef, Props}
 import akka.pattern.{ask, pipe}
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobID}
+import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
 import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.Messages.Disconnect

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index a5a577b..f810749 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -19,8 +19,9 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.executiongraph.ExecutionGraph
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobID}
+import org.apache.flink.runtime.jobgraph.JobStatus
 
 object TestingJobManagerMessages {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 06062f4..a47e4e7 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.{Terminated, ActorRef}
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
 import org.apache.flink.runtime.instance.InstanceConnectionInfo
-import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.messages.Messages.Disconnect
 import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask
 import org.apache.flink.runtime.taskmanager.{NetworkEnvironmentConfiguration, 
TaskManagerConfiguration, TaskManager}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
index ebe4555..2051ef5 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.testingUtils
 
 import akka.actor.ActorRef
+import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.JobID
 import org.apache.flink.runtime.taskmanager.Task
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 0b394e3..a955682 100644
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -52,7 +52,7 @@ public class AvroExternalJarProgramITCase {
                        PackagedProgram program = new PackagedProgram(new 
File(jarFile), new String[] { testData });
                                                
                        Client c = new Client(new 
InetSocketAddress("localhost", testMiniCluster.getJobManagerRPCPort()),
-                                       new Configuration(), 
program.getUserCodeClassLoader());
+                                       new Configuration(), 
program.getUserCodeClassLoader(), -1);
                        c.run(program, 4, true);
                }
                catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-staging/flink-spargel/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-spargel/pom.xml 
b/flink-staging/flink-spargel/pom.xml
index a0f0f73..bd03c1a 100644
--- a/flink-staging/flink-spargel/pom.xml
+++ b/flink-staging/flink-spargel/pom.xml
@@ -56,5 +56,12 @@ under the License.
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-optimizer</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
 
b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
index 38c26f0..d0b0164 100644
--- 
a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
+++ 
b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -39,7 +40,6 @@ import org.apache.flink.runtime.operators.util.LocalStrategy;
 import 
org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager;
 import 
org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater;
 import 
org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner;
-import org.apache.flink.test.compiler.util.CompilerTestBase;
 
 
 public class SpargelCompilerTest extends CompilerTestBase {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 4faa329..3308ab7 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -111,10 +112,16 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
 
                Configuration configuration = jobGraph.getJobConfiguration();
                Client client = new Client(new InetSocketAddress(host, port), 
configuration,
-                               JobWithJars.buildUserCodeClassLoader(jarFiles, 
JobWithJars.class.getClassLoader()));
+                               JobWithJars.buildUserCodeClassLoader(jarFiles, 
JobWithJars.class.getClassLoader()), -1);
 
                try {
-                       return client.run(jobGraph, true);
+                       JobSubmissionResult result = client.run(jobGraph, true);
+                       if(result instanceof JobExecutionResult) {
+                               return (JobExecutionResult) result;
+                       } else {
+                               LOG.warn("The Client didn't return a 
JobExecutionResult");
+                               return new 
JobExecutionResult(result.getJobID(), -1, null);
+                       }
                } catch (ProgramInvocationException e) {
                        throw new RuntimeException("Cannot execute job due to 
ProgramInvocationException", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index b03ab0e..84ad710 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -21,15 +21,19 @@ import java.io.File;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
-       protected static ClassLoader userClassLoader;
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamContextEnvironment.class);
+
        protected List<File> jars;
        protected Client client;
 
@@ -70,13 +74,13 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
                        jobGraph.addJar(new Path(file.getAbsolutePath()));
                }
 
-               try {
-                       return client.run(jobGraph, true);
-
-               } catch (Exception e) {
-                       throw e;
+               JobSubmissionResult result = client.run(jobGraph, true);
+               if(result instanceof JobExecutionResult) {
+                       return (JobExecutionResult) result;
+               } else {
+                       LOG.warn("The Client didn't return a 
JobExecutionResult");
+                       return new JobExecutionResult(result.getJobID(), -1, 
null);
                }
-
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
deleted file mode 100644
index 435713b..0000000
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.compiler.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.util.OperatingSystem;
-import org.junit.Before;
-
-/**
- *
- */
-public abstract class CompilerTestBase {
-
-       protected static final String IN_FILE = OperatingSystem.isWindows() ? 
"file:/c:/" : "file:///dev/random";
-       
-       protected static final String OUT_FILE = OperatingSystem.isWindows() ? 
"file:/c:/" : "file:///dev/null";
-       
-       protected static final int DEFAULT_PARALLELISM = 8;
-       
-       protected static final String DEFAULT_PARALLELISM_STRING = 
String.valueOf(DEFAULT_PARALLELISM);
-       
-       private static final String CACHE_KEY = "cachekey";
-       
-       // 
------------------------------------------------------------------------
-       
-       protected DataStatistics dataStats;
-       
-       protected Optimizer withStatsCompiler;
-       
-       protected Optimizer noStatsCompiler;
-       
-       private int statCounter;
-       
-       // 
------------------------------------------------------------------------     
-       
-       @Before
-       public void setup() {
-               this.dataStats = new DataStatistics();
-               this.withStatsCompiler = new Optimizer(this.dataStats, new 
DefaultCostEstimator());
-               
this.withStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
-               
-               this.noStatsCompiler = new Optimizer(null, new 
DefaultCostEstimator());
-               this.noStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM);
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       public OptimizedPlan compileWithStats(Plan p) {
-               return this.withStatsCompiler.compile(p);
-       }
-       
-       public OptimizedPlan compileNoStats(Plan p) {
-               return this.noStatsCompiler.compile(p);
-       }
-       
-       public void setSourceStatistics(GenericDataSourceBase<?, ?> source, 
long size, float recordWidth) {
-               setSourceStatistics(source, new 
FileBaseStatistics(Long.MAX_VALUE, size, recordWidth));
-       }
-       
-       public void setSourceStatistics(GenericDataSourceBase<?, ?> source, 
FileBaseStatistics stats) {
-               final String key = CACHE_KEY + this.statCounter++;
-               this.dataStats.cacheBaseStatistics(stats, key);
-               source.setStatisticsKey(key);
-       }
-
-       public static OperatorResolver getContractResolver(Plan plan) {
-               return new OperatorResolver(plan);
-       }
-       
-       public static OptimizerPlanNodeResolver 
getOptimizerPlanNodeResolver(OptimizedPlan plan) {
-               return new OptimizerPlanNodeResolver(plan);
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       public static final class OptimizerPlanNodeResolver {
-               
-               private final Map<String, ArrayList<PlanNode>> map;
-               
-               OptimizerPlanNodeResolver(OptimizedPlan p) {
-                       HashMap<String, ArrayList<PlanNode>> map = new 
HashMap<String, ArrayList<PlanNode>>();
-                       
-                       for (PlanNode n : p.getAllNodes()) {
-                               Operator<?> c = 
n.getOriginalOptimizerNode().getOperator();
-                               String name = c.getName();
-                               
-                               ArrayList<PlanNode> list = map.get(name);
-                               if (list == null) {
-                                       list = new ArrayList<PlanNode>(2);
-                                       map.put(name, list);
-                               }
-                               
-                               // check whether this node is a child of a node 
with the same contract (aka combiner)
-                               boolean shouldAdd = true;
-                               for (Iterator<PlanNode> iter = list.iterator(); 
iter.hasNext();) {
-                                       PlanNode in = iter.next();
-                                       if 
(in.getOriginalOptimizerNode().getOperator() == c) {
-                                               // is this the child or is our 
node the child
-                                               if (in instanceof 
SingleInputPlanNode && n instanceof SingleInputPlanNode) {
-                                                       SingleInputPlanNode 
thisNode = (SingleInputPlanNode) n;
-                                                       SingleInputPlanNode 
otherNode = (SingleInputPlanNode) in;
-                                                       
-                                                       if 
(thisNode.getPredecessor() == otherNode) {
-                                                               // other node 
is child, remove it
-                                                               iter.remove();
-                                                       } else if 
(otherNode.getPredecessor() == thisNode) {
-                                                               shouldAdd = 
false;
-                                                       }
-                                               } else {
-                                                       throw new 
RuntimeException("Unrecodnized case in test.");
-                                               }
-                                       }
-                               }
-                               
-                               if (shouldAdd) {
-                                       list.add(n);
-                               }
-                       }
-                       
-                       this.map = map;
-               }
-               
-               
-               @SuppressWarnings("unchecked")
-               public <T extends PlanNode> T getNode(String name) {
-                       List<PlanNode> nodes = this.map.get(name);
-                       if (nodes == null || nodes.isEmpty()) {
-                               throw new RuntimeException("No node found with 
the given name.");
-                       } else if (nodes.size() != 1) {
-                               throw new RuntimeException("Multiple nodes 
found with the given name.");
-                       } else {
-                               return (T) nodes.get(0);
-                       }
-               }
-               
-               @SuppressWarnings("unchecked")
-               public <T extends PlanNode> T getNode(String name, Class<? 
extends RichFunction> stubClass) {
-                       List<PlanNode> nodes = this.map.get(name);
-                       if (nodes == null || nodes.isEmpty()) {
-                               throw new RuntimeException("No node found with 
the given name and stub class.");
-                       } else {
-                               PlanNode found = null;
-                               for (PlanNode node : nodes) {
-                                       if (node.getClass() == stubClass) {
-                                               if (found == null) {
-                                                       found = node;
-                                               } else {
-                                                       throw new 
RuntimeException("Multiple nodes found with the given name and stub class.");
-                                               }
-                                       }
-                               }
-                               if (found == null) {
-                                       throw new RuntimeException("No node 
found with the given name and stub class.");
-                               } else {
-                                       return (T) found;
-                               }
-                       }
-               }
-               
-               public List<PlanNode> getNodes(String name) {
-                       List<PlanNode> nodes = this.map.get(name);
-                       if (nodes == null || nodes.isEmpty()) {
-                               throw new RuntimeException("No node found with 
the given name.");
-                       } else {
-                               return new ArrayList<PlanNode>(nodes);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
deleted file mode 100644
index 08cded2..0000000
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.compiler.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.util.Visitor;
-
-/**
- * Utility to get operator instances from plans via name.
- */
-@SuppressWarnings("deprecation")
-public class OperatorResolver implements Visitor<Operator<?>> {
-       
-       private final Map<String, List<Operator<?>>> map;
-       private Set<Operator<?>> seen;
-       
-       public OperatorResolver(Plan p) {
-               this.map = new HashMap<String, List<Operator<?>>>();
-               this.seen = new HashSet<Operator<?>>();
-               
-               p.accept(this);
-               this.seen = null;
-       }
-       
-       
-       @SuppressWarnings("unchecked")
-       public <T extends Operator<?>> T getNode(String name) {
-               List<Operator<?>> nodes = this.map.get(name);
-               if (nodes == null || nodes.isEmpty()) {
-                       throw new RuntimeException("No nodes found with the 
given name.");
-               } else if (nodes.size() != 1) {
-                       throw new RuntimeException("Multiple nodes found with 
the given name.");
-               } else {
-                       return (T) nodes.get(0);
-               }
-       }
-       
-       @SuppressWarnings("unchecked")
-       public <T extends Operator<?>> T getNode(String name, Class<? extends 
RichFunction> stubClass) {
-               List<Operator<?>> nodes = this.map.get(name);
-               if (nodes == null || nodes.isEmpty()) {
-                       throw new RuntimeException("No node found with the 
given name and stub class.");
-               } else {
-                       Operator<?> found = null;
-                       for (Operator<?> node : nodes) {
-                               if (node.getClass() == stubClass) {
-                                       if (found == null) {
-                                               found = node;
-                                       } else {
-                                               throw new 
RuntimeException("Multiple nodes found with the given name and stub class.");
-                                       }
-                               }
-                       }
-                       if (found == null) {
-                               throw new RuntimeException("No node found with 
the given name and stub class.");
-                       } else {
-                               return (T) found;
-                       }
-               }
-       }
-       
-       public List<Operator<?>> getNodes(String name) {
-               List<Operator<?>> nodes = this.map.get(name);
-               if (nodes == null || nodes.isEmpty()) {
-                       throw new RuntimeException("No node found with the 
given name.");
-               } else {
-                       return new ArrayList<Operator<?>>(nodes);
-               }
-       }
-
-       @Override
-       public boolean preVisit(Operator<?> visitable) {
-               if (this.seen.add(visitable)) {
-                       // add to  the map
-                       final String name = visitable.getName();
-                       List<Operator<?>> list = this.map.get(name);
-                       if (list == null) {
-                               list = new ArrayList<Operator<?>>(2);
-                               this.map.put(name, list);
-                       }
-                       list.add(visitable);
-                       
-                       // recurse into bulk iterations
-                       if (visitable instanceof BulkIteration) {
-                               ((BulkIteration) 
visitable).getNextPartialSolution().accept(this);
-                       } else if (visitable instanceof DeltaIteration) {
-                               ((DeltaIteration) 
visitable).getSolutionSetDelta().accept(this);
-                               ((DeltaIteration) 
visitable).getNextWorkset().accept(this);
-                       }
-                       
-                       return true;
-               } else {
-                       return false;
-               }
-       }
-
-       @Override
-       public void postVisit(Operator<?> visitable) {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
index eafe9ad..5d4cf4d 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java
@@ -72,7 +72,7 @@ public abstract class RecordAPITestBase extends 
AbstractTestBase {
                        Assert.fail("Error: Cannot obtain Pact plan. Did the 
thest forget to override either 'getPactPlan()' or 'getJobGraph()' ?");
                }
                
-               Optimizer pc = new Optimizer(new DataStatistics());
+               Optimizer pc = new Optimizer(new DataStatistics(), this.config);
                OptimizedPlan op = pc.compile(p);
                
                if (printPlan) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index 02c1434..b3423cb 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -80,7 +80,7 @@ public class TestEnvironment extends ExecutionEnvironment {
        private OptimizedPlan compileProgram(String jobName) {
                Plan p = createProgramPlan(jobName);
 
-               Optimizer pc = new Optimizer(new DataStatistics());
+               Optimizer pc = new Optimizer(new DataStatistics(), 
this.executor.getConfiguration());
                return pc.compile(p);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 9675150..977c205 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -126,6 +126,14 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-optimizer</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-runtime</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 400ed3a..345bffd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -140,7 +140,7 @@ public abstract class CancellingTestBase {
        }
 
        private JobGraph getJobGraph(final Plan plan) throws Exception {
-               final Optimizer pc = new Optimizer(new DataStatistics());
+               final Optimizer pc = new Optimizer(new DataStatistics(), 
this.executor.getConfiguration());
                final OptimizedPlan op = pc.compile(plan);
                final JobGraphGenerator jgg = new JobGraphGenerator();
                return jgg.compileJobGraph(op);

http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
index 4fd4617..1724920 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
@@ -31,8 +31,8 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.test.compiler.util.CompilerTestBase;
-import org.apache.flink.test.compiler.util.OperatorResolver;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.optimizer.util.OperatorResolver;
 import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep;
 import org.junit.Assert;
 import org.junit.Test;

Reply via email to