http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java index e3aa2f0..98ccfcf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexProfilingData.java @@ -23,7 +23,7 @@ import java.io.IOException; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; public abstract class InternalExecutionVertexProfilingData implements InternalProfilingData {
http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java index 8819fc4..92ce916 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/impl/types/InternalExecutionVertexThreadProfilingData.java @@ -23,7 +23,7 @@ import java.io.IOException; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; public class InternalExecutionVertexThreadProfilingData extends InternalExecutionVertexProfilingData { http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java index 5449869..7de9252 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceProfilingEvent.java @@ -22,7 +22,7 @@ import java.io.IOException; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; /** * Instance profiling events are a special subclass of profiling events. They contain profiling information about the http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java index ec3ab30..4bb052d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/InstanceSummaryProfilingEvent.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.profiling.types; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; /** * Instance summary profiling events summarize the profiling events of all instances involved in computing a job. http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java index a4d0634..13a9a99 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ProfilingEvent.java @@ -23,7 +23,7 @@ import java.io.IOException; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.event.job.AbstractEvent; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import com.google.common.base.Preconditions; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java index 4e0faf7..e07f144 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/SingleInstanceProfilingEvent.java @@ -22,7 +22,7 @@ import java.io.IOException; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.types.StringValue; import com.google.common.base.Preconditions; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java index d878906..08b932a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/ThreadProfilingEvent.java @@ -23,7 +23,7 @@ import java.io.IOException; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; /** http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java index 148d16d..fc18758 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/profiling/types/VertexProfilingEvent.java @@ -24,7 +24,7 @@ import java.io.IOException; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; /** http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index fa2413b..3d1419a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.messages.ExecutionGraphMessages; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java index bda7c8b..a56a7e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java @@ -25,7 +25,7 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.util.InstantiationUtil; /** http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java index 16ca090..6e446de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java @@ -24,7 +24,7 @@ import akka.pattern.Patterns; import akka.util.Timeout; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.messages.JobManagerMessages; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java index e8b730d..039e926 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java @@ -21,35 +21,115 @@ import org.apache.hadoop.fs.Path; import java.io.File; import java.util.List; +/** + * Abstract interface for an implementation of a Flink on YARN client to deploy. + * + * The Client describes the properties of the YARN application to create. + */ public abstract class AbstractFlinkYarnClient { // ---- Setter for YARN Cluster properties ----- // + + /** + * @param memoryMB The amount of memory for the JobManager (in MB) + */ public abstract void setJobManagerMemory(int memoryMB); + + /** + * @param memoryMB The memory per TaskManager (in MB) + */ public abstract void setTaskManagerMemory(int memoryMB); + /** + * Flink configuration + */ public abstract void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf); + /** + * + * @param slots The number of TaskManager slots per TaskManager. + */ public abstract void setTaskManagerSlots(int slots); + + /** + * @return the number of TaskManager processing slots per TaskManager. + */ public abstract int getTaskManagerSlots(); + + /** + * @param queue Name of the YARN queue + */ public abstract void setQueue(String queue); + + /** + * + * @param localJarPath Local Path to the Flink uberjar + */ public abstract void setLocalJarPath(Path localJarPath); + + /** + * + * @param confPath local path to the Flink configuration file + */ public abstract void setConfigurationFilePath(Path confPath); + + /** + * + * @param logConfPath local path to the flink logging configuration + */ public abstract void setFlinkLoggingConfigurationPath(Path logConfPath); public abstract Path getFlinkLoggingConfigurationPath(); + + /** + * + * @param tmCount number of TaskManagers to start + */ public abstract void setTaskManagerCount(int tmCount); public abstract int getTaskManagerCount(); + + /** + * @param confDirPath Path to config directory. + */ public abstract void setConfigurationDirectory(String confDirPath); - // List of files to transfer to the YARN containers. + + /** + * List of files to transfer to the YARN containers. + */ public abstract void setShipFiles(List<File> shipFiles); + + /** + * + * @param dynamicPropertiesEncoded Encoded String of the dynamic properties (-D configuration values of the Flink configuration) + */ public abstract void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded); public abstract String getDynamicPropertiesEncoded(); - // ---- Operations on the YARN cluster ----- // + // --------------------------------------- Operations on the YARN cluster ----- // + + /** + * Returns a String containing details about the cluster (NodeManagers, available memory, ...) + * + */ public abstract String getClusterDescription() throws Exception; + /** + * Trigger the deployment to YARN. + * + * @param clusterName Name to be shown in the YARN resource manager overview. + */ public abstract AbstractFlinkYarnCluster deploy(String clusterName) throws Exception; + /** + * @param detachedMode If true, the Flink YARN client is non-blocking. That means it returns + * once Flink has been started successfully on YARN. + */ public abstract void setDetachedMode(boolean detachedMode); + public abstract boolean isDetached(); + + /** + * @return The string representation of the Path to the YARN session files. This is a temporary + * directory in HDFS that contains the jar files and configuration which is shipped to all the containers. + */ public abstract String getSessionFilesDir(); } http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java index 4b4bd2d..398709e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.yarn; +import org.apache.flink.api.common.JobID; + +import java.io.IOException; import java.net.InetSocketAddress; import java.util.List; @@ -43,4 +46,30 @@ public abstract class AbstractFlinkYarnCluster { public abstract List<String> getNewMessages(); public abstract String getApplicationId(); + + public abstract boolean isDetached(); + + /** + * Connect the FlinkYarnCluster to the ApplicationMaster. + * + * Detached YARN sessions don't need to connect to the ApplicationMaster. + * Detached per job YARN sessions need to connect until the required number of TaskManagers have been started. + * + * @throws IOException + */ + public abstract void connectToCluster() throws IOException; + + /** + * Disconnect from the ApplicationMaster without stopping the session + * (therefore, use the {@see shutdown()} method. + */ + public abstract void disconnect(); + + /** + * Tells the ApplicationMaster to monitor the status of JobId and stop itself once the specified + * job has finished. + * + * @param jobID + */ + public abstract void stopAfterJob(JobID jobID); } http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala index f1c6450..d153dcc 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala @@ -88,8 +88,8 @@ ActorLogging { case Success(_) => - case JobResultSuccess(_, duration, accumulatorResults) => - jobSubmitter ! new JobExecutionResult(duration, accumulatorResults) + case JobResultSuccess(jobId, duration, accumulatorResults) => + jobSubmitter ! new JobExecutionResult(jobId, duration, accumulatorResults) self ! PoisonPill case msg => http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 9aa476d..4b0a55b 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -22,7 +22,7 @@ import java.io.{IOException, File} import java.net.InetSocketAddress import akka.actor.Status.{Success, Failure} -import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.{JobID, ExecutionConfig} import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration} import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.blob.BlobServer @@ -41,7 +41,7 @@ import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.instance.InstanceManager -import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobID} +import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus} import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.messages.JobManagerMessages._ @@ -86,7 +86,7 @@ import scala.collection.JavaConverters._ * - [[JobStatusChanged]] indicates that the status of job (RUNNING, CANCELING, FINISHED, etc.) has * changed. This message is sent by the ExecutionGraph. */ -class JobManager(val configuration: Configuration, +class JobManager(val flinkConfiguration: Configuration, val instanceManager: InstanceManager, val scheduler: FlinkScheduler, val libraryCacheManager: BlobLibraryCacheManager, http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index 0c3384c..cb7bfec 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -19,9 +19,9 @@ package org.apache.flink.runtime.jobmanager import akka.actor.{ActorLogging, Actor} +import org.apache.flink.api.common.JobID import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.executiongraph.ExecutionGraph -import org.apache.flink.runtime.jobgraph.JobID import org.apache.flink.runtime.messages.ArchiveMessages._ import org.apache.flink.runtime.messages.JobManagerMessages._ http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala index b6b2cc1..d9a3421 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala @@ -21,11 +21,12 @@ package org.apache.flink.runtime.jobmanager import java.lang.Long import akka.actor._ +import org.apache.flink.api.common.JobID import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.execution.ExecutionState.RUNNING import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph, ExecutionVertex} import org.apache.flink.runtime.jobgraph.JobStatus._ -import org.apache.flink.runtime.jobgraph.{JobID, JobVertexID} +import org.apache.flink.runtime.jobgraph.JobVertexID import org.apache.flink.runtime.state.StateHandle import scala.collection.JavaConversions._ http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala index 704bf86..b4ed2cc 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala @@ -18,8 +18,8 @@ package org.apache.flink.runtime.messages +import org.apache.flink.api.common.JobID import org.apache.flink.runtime.executiongraph.ExecutionGraph -import org.apache.flink.runtime.jobgraph.JobID /** * This object contains the archive specific messages. http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala index d413d13..6785c31 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ExecutionGraphMessages.scala @@ -21,9 +21,10 @@ package org.apache.flink.runtime.messages import java.text.SimpleDateFormat import java.util.Date +import org.apache.flink.api.common.JobID import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.executiongraph.ExecutionAttemptID -import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID, JobID} +import org.apache.flink.runtime.jobgraph.{JobStatus, JobVertexID} /** * This object contains the execution graph specific messages. http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index 17e9138..dab4671 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -18,12 +18,13 @@ package org.apache.flink.runtime.messages +import org.apache.flink.api.common.JobID import org.apache.flink.runtime.accumulators.AccumulatorEvent import org.apache.flink.runtime.client.JobStatusMessage import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph} import org.apache.flink.runtime.instance.{InstanceID, Instance} import org.apache.flink.runtime.io.network.partition.ResultPartitionID -import org.apache.flink.runtime.jobgraph.{JobGraph, JobID, JobStatus, JobVertexID} +import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID} import org.apache.flink.runtime.taskmanager.TaskExecutionState import scala.collection.JavaConverters._ http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 0c9bf9b..cb79fbc 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -193,6 +193,10 @@ class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: } } + def getConfiguration: Configuration = { + this.userConfiguration + } + def getDefaultConfig: Configuration = { val config: Configuration = new Configuration() http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java index 2254d7c..db74bbd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java @@ -31,7 +31,7 @@ import java.net.InetSocketAddress; import java.security.MessageDigest; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java index adb3bfc..8613b4e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.blob; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.junit.Test; import java.io.File; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java index 1f8d29b..c98e575 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.blob; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.junit.Test; import java.io.ByteArrayInputStream; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java index 5742fea..50b154e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java @@ -28,7 +28,7 @@ import java.util.List; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.RegularPactTask; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java index 239e356..d024135 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.junit.Test; import static org.junit.Assert.*; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index e0852c6..894a7a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index c979c42..8c4879f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -46,7 +46,7 @@ import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.operators.RegularPactTask; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index d136d6f..5dd10c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -39,7 +39,7 @@ import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index 2d1d9d5..1e78ac5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -32,7 +32,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index 168ea90..e199353 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -39,7 +39,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.TaskManagerMessages; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index f5089c2..5713c10 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult; import org.apache.flink.runtime.testingUtils.TestingUtils; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index 66ef4ca..e24a2b4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -32,7 +32,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java index ddd7282..f72d105 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java @@ -33,7 +33,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; public class PointwisePatternTest { http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java index 4f82dea..17d78d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexSlotSharingTest.java @@ -28,7 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java index df28f27..8623f75 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/filecache/FileCacheDeleteValidationTest.java @@ -26,7 +26,7 @@ import org.junit.Assert; import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.core.fs.local.LocalFileSystem; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.junit.After; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java index 47f7a2b..c0ed629 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java @@ -24,7 +24,7 @@ import java.lang.reflect.Method; import java.net.InetAddress; import akka.actor.ActorRef; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.junit.Test; /** http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java index b16bf4b..f6c0c9f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java @@ -25,7 +25,7 @@ import java.net.InetAddress; import akka.actor.ActorRef; import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.junit.Test; import org.mockito.Matchers; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index cc90c44..32fe1a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import com.google.common.collect.Lists; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.TaskEventDispatcher; import org.apache.flink.runtime.io.network.buffer.BufferPool; @@ -34,7 +35,6 @@ import org.apache.flink.runtime.io.network.util.TestPartitionProducer; import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.jobgraph.JobID; import org.junit.Test; import java.util.Collections; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobIdTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobIdTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobIdTest.java index 0405f50..083a1a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobIdTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobIdTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.jobgraph; +import org.apache.flink.api.common.JobID; import org.junit.Test; import java.nio.ByteBuffer; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java index 006bd27..776184c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java @@ -35,7 +35,7 @@ import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java index fcb638f..42a3702 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SharedSlotsTest.java @@ -29,7 +29,7 @@ import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; public class SharedSlotsTest { http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java index 23a5c94..d678531 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.junit.Test; public class SlotAllocationFutureTest { http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index d5c9fbb..7cbb59b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -35,7 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java index d7bdda3..c3c75ce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/profiling/types/ProfilingTypesTest.java @@ -23,11 +23,8 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.profiling.types.InstanceSummaryProfilingEvent; -import org.apache.flink.runtime.profiling.types.SingleInstanceProfilingEvent; -import org.apache.flink.runtime.profiling.types.ThreadProfilingEvent; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.ManagementTestUtils; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java index e9e17ca..6939527 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java @@ -24,7 +24,7 @@ import java.io.IOException; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 8784c14..640ccc3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -41,7 +41,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.Tasks; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 68b0e6f..689b22d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.execution.RuntimeEnvironment; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.MockNetworkEnvironment; -import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala index 775c0cb..a36ded9 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.scala @@ -20,9 +20,10 @@ package org.apache.flink.runtime.executiongraph import akka.actor.{Props, ActorSystem} import akka.testkit.TestKit +import org.apache.flink.api.common.JobID import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.AkkaUtils -import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, AbstractJobVertex} +import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, AbstractJobVertex} import org.apache.flink.runtime.jobmanager.Tasks import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.testingUtils.TestingUtils http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala index 69b0588..1a36112 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala @@ -20,11 +20,12 @@ package org.apache.flink.runtime.executiongraph import akka.actor.{Props, ActorSystem} import akka.testkit.TestKit +import org.apache.flink.api.common.JobID import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils .SimpleAcknowledgingTaskManager -import org.apache.flink.runtime.jobgraph.{JobStatus, JobID, JobGraph, AbstractJobVertex} +import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, AbstractJobVertex} import org.apache.flink.runtime.jobmanager.Tasks import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.testingUtils.TestingUtils http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index 89cbfe6..89e1d72 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -20,9 +20,10 @@ package org.apache.flink.runtime.testingUtils import akka.actor.{Cancellable, Terminated, ActorRef, Props} import akka.pattern.{ask, pipe} +import org.apache.flink.api.common.JobID import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.execution.ExecutionState -import org.apache.flink.runtime.jobgraph.{JobStatus, JobID} +import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist} import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged import org.apache.flink.runtime.messages.Messages.Disconnect http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala index a5a577b..f810749 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -19,8 +19,9 @@ package org.apache.flink.runtime.testingUtils import akka.actor.ActorRef +import org.apache.flink.api.common.JobID import org.apache.flink.runtime.executiongraph.ExecutionGraph -import org.apache.flink.runtime.jobgraph.{JobStatus, JobID} +import org.apache.flink.runtime.jobgraph.JobStatus object TestingJobManagerMessages { http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index 06062f4..a47e4e7 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -19,9 +19,9 @@ package org.apache.flink.runtime.testingUtils import akka.actor.{Terminated, ActorRef} +import org.apache.flink.api.common.JobID import org.apache.flink.runtime.executiongraph.ExecutionAttemptID import org.apache.flink.runtime.instance.InstanceConnectionInfo -import org.apache.flink.runtime.jobgraph.JobID import org.apache.flink.runtime.messages.Messages.Disconnect import org.apache.flink.runtime.messages.TaskManagerMessages.UnregisterTask import org.apache.flink.runtime.taskmanager.{NetworkEnvironmentConfiguration, TaskManagerConfiguration, TaskManager} http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala index ebe4555..2051ef5 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala @@ -19,8 +19,8 @@ package org.apache.flink.runtime.testingUtils import akka.actor.ActorRef +import org.apache.flink.api.common.JobID import org.apache.flink.runtime.executiongraph.ExecutionAttemptID -import org.apache.flink.runtime.jobgraph.JobID import org.apache.flink.runtime.taskmanager.Task /** http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java index 0b394e3..a955682 100644 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java +++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java @@ -52,7 +52,7 @@ public class AvroExternalJarProgramITCase { PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData }); Client c = new Client(new InetSocketAddress("localhost", testMiniCluster.getJobManagerRPCPort()), - new Configuration(), program.getUserCodeClassLoader()); + new Configuration(), program.getUserCodeClassLoader(), -1); c.run(program, 4, true); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-staging/flink-spargel/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-spargel/pom.xml b/flink-staging/flink-spargel/pom.xml index a0f0f73..bd03c1a 100644 --- a/flink-staging/flink-spargel/pom.xml +++ b/flink-staging/flink-spargel/pom.xml @@ -56,5 +56,12 @@ under the License. <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-optimizer</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java index 38c26f0..d0b0164 100644 --- a/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java +++ b/flink-staging/flink-spargel/src/test/java/org/apache/flink/spargel/java/SpargelCompilerTest.java @@ -26,6 +26,7 @@ import static org.junit.Assert.fail; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.junit.Test; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -39,7 +40,6 @@ import org.apache.flink.runtime.operators.util.LocalStrategy; import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCMessager; import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.CCUpdater; import org.apache.flink.spargel.java.examples.SpargelConnectedComponents.IdAssigner; -import org.apache.flink.test.compiler.util.CompilerTestBase; public class SpargelCompilerTest extends CompilerTestBase { http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index 4faa329..3308ab7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.client.program.Client; import org.apache.flink.client.program.JobWithJars; import org.apache.flink.client.program.ProgramInvocationException; @@ -111,10 +112,16 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { Configuration configuration = jobGraph.getJobConfiguration(); Client client = new Client(new InetSocketAddress(host, port), configuration, - JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader())); + JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1); try { - return client.run(jobGraph, true); + JobSubmissionResult result = client.run(jobGraph, true); + if(result instanceof JobExecutionResult) { + return (JobExecutionResult) result; + } else { + LOG.warn("The Client didn't return a JobExecutionResult"); + return new JobExecutionResult(result.getJobID(), -1, null); + } } catch (ProgramInvocationException e) { throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index b03ab0e..84ad710 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -21,15 +21,19 @@ import java.io.File; import java.util.List; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.client.program.Client; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class StreamContextEnvironment extends StreamExecutionEnvironment { - protected static ClassLoader userClassLoader; + private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class); + protected List<File> jars; protected Client client; @@ -70,13 +74,13 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { jobGraph.addJar(new Path(file.getAbsolutePath())); } - try { - return client.run(jobGraph, true); - - } catch (Exception e) { - throw e; + JobSubmissionResult result = client.run(jobGraph, true); + if(result instanceof JobExecutionResult) { + return (JobExecutionResult) result; + } else { + LOG.warn("The Client didn't return a JobExecutionResult"); + return new JobExecutionResult(result.getJobID(), -1, null); } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java deleted file mode 100644 index 435713b..0000000 --- a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/CompilerTestBase.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.test.compiler.util; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; -import org.apache.flink.api.common.operators.GenericDataSourceBase; -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.Optimizer; -import org.apache.flink.optimizer.costs.DefaultCostEstimator; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.PlanNode; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.util.OperatingSystem; -import org.junit.Before; - -/** - * - */ -public abstract class CompilerTestBase { - - protected static final String IN_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/random"; - - protected static final String OUT_FILE = OperatingSystem.isWindows() ? "file:/c:/" : "file:///dev/null"; - - protected static final int DEFAULT_PARALLELISM = 8; - - protected static final String DEFAULT_PARALLELISM_STRING = String.valueOf(DEFAULT_PARALLELISM); - - private static final String CACHE_KEY = "cachekey"; - - // ------------------------------------------------------------------------ - - protected DataStatistics dataStats; - - protected Optimizer withStatsCompiler; - - protected Optimizer noStatsCompiler; - - private int statCounter; - - // ------------------------------------------------------------------------ - - @Before - public void setup() { - this.dataStats = new DataStatistics(); - this.withStatsCompiler = new Optimizer(this.dataStats, new DefaultCostEstimator()); - this.withStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM); - - this.noStatsCompiler = new Optimizer(null, new DefaultCostEstimator()); - this.noStatsCompiler.setDefaultParallelism(DEFAULT_PARALLELISM); - } - - // ------------------------------------------------------------------------ - - public OptimizedPlan compileWithStats(Plan p) { - return this.withStatsCompiler.compile(p); - } - - public OptimizedPlan compileNoStats(Plan p) { - return this.noStatsCompiler.compile(p); - } - - public void setSourceStatistics(GenericDataSourceBase<?, ?> source, long size, float recordWidth) { - setSourceStatistics(source, new FileBaseStatistics(Long.MAX_VALUE, size, recordWidth)); - } - - public void setSourceStatistics(GenericDataSourceBase<?, ?> source, FileBaseStatistics stats) { - final String key = CACHE_KEY + this.statCounter++; - this.dataStats.cacheBaseStatistics(stats, key); - source.setStatisticsKey(key); - } - - public static OperatorResolver getContractResolver(Plan plan) { - return new OperatorResolver(plan); - } - - public static OptimizerPlanNodeResolver getOptimizerPlanNodeResolver(OptimizedPlan plan) { - return new OptimizerPlanNodeResolver(plan); - } - - // ------------------------------------------------------------------------ - - public static final class OptimizerPlanNodeResolver { - - private final Map<String, ArrayList<PlanNode>> map; - - OptimizerPlanNodeResolver(OptimizedPlan p) { - HashMap<String, ArrayList<PlanNode>> map = new HashMap<String, ArrayList<PlanNode>>(); - - for (PlanNode n : p.getAllNodes()) { - Operator<?> c = n.getOriginalOptimizerNode().getOperator(); - String name = c.getName(); - - ArrayList<PlanNode> list = map.get(name); - if (list == null) { - list = new ArrayList<PlanNode>(2); - map.put(name, list); - } - - // check whether this node is a child of a node with the same contract (aka combiner) - boolean shouldAdd = true; - for (Iterator<PlanNode> iter = list.iterator(); iter.hasNext();) { - PlanNode in = iter.next(); - if (in.getOriginalOptimizerNode().getOperator() == c) { - // is this the child or is our node the child - if (in instanceof SingleInputPlanNode && n instanceof SingleInputPlanNode) { - SingleInputPlanNode thisNode = (SingleInputPlanNode) n; - SingleInputPlanNode otherNode = (SingleInputPlanNode) in; - - if (thisNode.getPredecessor() == otherNode) { - // other node is child, remove it - iter.remove(); - } else if (otherNode.getPredecessor() == thisNode) { - shouldAdd = false; - } - } else { - throw new RuntimeException("Unrecodnized case in test."); - } - } - } - - if (shouldAdd) { - list.add(n); - } - } - - this.map = map; - } - - - @SuppressWarnings("unchecked") - public <T extends PlanNode> T getNode(String name) { - List<PlanNode> nodes = this.map.get(name); - if (nodes == null || nodes.isEmpty()) { - throw new RuntimeException("No node found with the given name."); - } else if (nodes.size() != 1) { - throw new RuntimeException("Multiple nodes found with the given name."); - } else { - return (T) nodes.get(0); - } - } - - @SuppressWarnings("unchecked") - public <T extends PlanNode> T getNode(String name, Class<? extends RichFunction> stubClass) { - List<PlanNode> nodes = this.map.get(name); - if (nodes == null || nodes.isEmpty()) { - throw new RuntimeException("No node found with the given name and stub class."); - } else { - PlanNode found = null; - for (PlanNode node : nodes) { - if (node.getClass() == stubClass) { - if (found == null) { - found = node; - } else { - throw new RuntimeException("Multiple nodes found with the given name and stub class."); - } - } - } - if (found == null) { - throw new RuntimeException("No node found with the given name and stub class."); - } else { - return (T) found; - } - } - } - - public List<PlanNode> getNodes(String name) { - List<PlanNode> nodes = this.map.get(name); - if (nodes == null || nodes.isEmpty()) { - throw new RuntimeException("No node found with the given name."); - } else { - return new ArrayList<PlanNode>(nodes); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java b/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java deleted file mode 100644 index 08cded2..0000000 --- a/flink-test-utils/src/main/java/org/apache/flink/test/compiler/util/OperatorResolver.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.compiler.util; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.functions.RichFunction; -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.java.record.operators.BulkIteration; -import org.apache.flink.api.java.record.operators.DeltaIteration; -import org.apache.flink.util.Visitor; - -/** - * Utility to get operator instances from plans via name. - */ -@SuppressWarnings("deprecation") -public class OperatorResolver implements Visitor<Operator<?>> { - - private final Map<String, List<Operator<?>>> map; - private Set<Operator<?>> seen; - - public OperatorResolver(Plan p) { - this.map = new HashMap<String, List<Operator<?>>>(); - this.seen = new HashSet<Operator<?>>(); - - p.accept(this); - this.seen = null; - } - - - @SuppressWarnings("unchecked") - public <T extends Operator<?>> T getNode(String name) { - List<Operator<?>> nodes = this.map.get(name); - if (nodes == null || nodes.isEmpty()) { - throw new RuntimeException("No nodes found with the given name."); - } else if (nodes.size() != 1) { - throw new RuntimeException("Multiple nodes found with the given name."); - } else { - return (T) nodes.get(0); - } - } - - @SuppressWarnings("unchecked") - public <T extends Operator<?>> T getNode(String name, Class<? extends RichFunction> stubClass) { - List<Operator<?>> nodes = this.map.get(name); - if (nodes == null || nodes.isEmpty()) { - throw new RuntimeException("No node found with the given name and stub class."); - } else { - Operator<?> found = null; - for (Operator<?> node : nodes) { - if (node.getClass() == stubClass) { - if (found == null) { - found = node; - } else { - throw new RuntimeException("Multiple nodes found with the given name and stub class."); - } - } - } - if (found == null) { - throw new RuntimeException("No node found with the given name and stub class."); - } else { - return (T) found; - } - } - } - - public List<Operator<?>> getNodes(String name) { - List<Operator<?>> nodes = this.map.get(name); - if (nodes == null || nodes.isEmpty()) { - throw new RuntimeException("No node found with the given name."); - } else { - return new ArrayList<Operator<?>>(nodes); - } - } - - @Override - public boolean preVisit(Operator<?> visitable) { - if (this.seen.add(visitable)) { - // add to the map - final String name = visitable.getName(); - List<Operator<?>> list = this.map.get(name); - if (list == null) { - list = new ArrayList<Operator<?>>(2); - this.map.put(name, list); - } - list.add(visitable); - - // recurse into bulk iterations - if (visitable instanceof BulkIteration) { - ((BulkIteration) visitable).getNextPartialSolution().accept(this); - } else if (visitable instanceof DeltaIteration) { - ((DeltaIteration) visitable).getSolutionSetDelta().accept(this); - ((DeltaIteration) visitable).getNextWorkset().accept(this); - } - - return true; - } else { - return false; - } - } - - @Override - public void postVisit(Operator<?> visitable) {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java index eafe9ad..5d4cf4d 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/RecordAPITestBase.java @@ -72,7 +72,7 @@ public abstract class RecordAPITestBase extends AbstractTestBase { Assert.fail("Error: Cannot obtain Pact plan. Did the thest forget to override either 'getPactPlan()' or 'getJobGraph()' ?"); } - Optimizer pc = new Optimizer(new DataStatistics()); + Optimizer pc = new Optimizer(new DataStatistics(), this.config); OptimizedPlan op = pc.compile(p); if (printPlan) { http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java index 02c1434..b3423cb 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java @@ -80,7 +80,7 @@ public class TestEnvironment extends ExecutionEnvironment { private OptimizedPlan compileProgram(String jobName) { Plan p = createProgramPlan(jobName); - Optimizer pc = new Optimizer(new DataStatistics()); + Optimizer pc = new Optimizer(new DataStatistics(), this.executor.getConfiguration()); return pc.compile(p); } http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 9675150..977c205 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -126,6 +126,14 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-optimizer</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-runtime</artifactId> <version>${project.version}</version> <type>test-jar</type> http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java index 400ed3a..345bffd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java @@ -140,7 +140,7 @@ public abstract class CancellingTestBase { } private JobGraph getJobGraph(final Plan plan) throws Exception { - final Optimizer pc = new Optimizer(new DataStatistics()); + final Optimizer pc = new Optimizer(new DataStatistics(), this.executor.getConfiguration()); final OptimizedPlan op = pc.compile(plan); final JobGraphGenerator jgg = new JobGraphGenerator(); return jgg.compileJobGraph(op); http://git-wip-us.apache.org/repos/asf/flink/blob/6b0d4076/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java index 4fd4617..1724920 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java @@ -31,8 +31,8 @@ import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.runtime.operators.DriverStrategy; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.apache.flink.test.compiler.util.CompilerTestBase; -import org.apache.flink.test.compiler.util.OperatorResolver; +import org.apache.flink.optimizer.util.CompilerTestBase; +import org.apache.flink.optimizer.util.OperatorResolver; import org.apache.flink.test.recordJobs.kmeans.KMeansSingleStep; import org.junit.Assert; import org.junit.Test;