buildbot success in on flink-docs-release-0.10
The Buildbot has detected a restored build on builder flink-docs-release-0.10 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-0.10/builds/363 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave3_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' triggered this build Build Source Stamp: [branch release-0.10] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
buildbot success in on flink-docs-master
The Buildbot has detected a restored build on builder flink-docs-master while building . Full details are available at: https://ci.apache.org/builders/flink-docs-master/builds/494 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave3_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered this build Build Source Stamp: [branch master] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
[4/5] flink git commit: [FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager
[FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e91b82d3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e91b82d3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e91b82d3 Branch: refs/heads/flip-6 Commit: e91b82d3c868e18611064f905b345906f1414f84 Parents: 655722a Author: Kurt Young Authored: Sun Oct 16 22:20:38 2016 +0800 Committer: Stephan Ewen Committed: Sun Oct 16 22:14:41 2016 +0200 -- .../apache/flink/runtime/instance/SlotPool.java | 1 - .../jobmanager/slots/PooledSlotProvider.java| 73 .../flink/runtime/jobmaster/JobMaster.java | 24 --- 3 files changed, 89 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index e7857c1..de952c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -135,7 +135,6 @@ public class SlotPool implements SlotOwner { internalAllocateSlot(jobID, allocationID, resourceProfile, future); - final SlotOwner owner = this; return future.thenApplyAsync( new ApplyFunction() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java new file mode 100644 index 000..5655fc2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.slots; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.SlotPool; +import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A simple pool based slot provider with {@link SlotPool} as the underlying storage. + */ +public class PooledSlotProvider implements SlotProvider { + + /** The pool which holds all the slots. */ + private final SlotPool slotPool; + + /** The timeout for allocation. */ + private final Time timeout; + + public PooledSlotProvider(final SlotPool slotPool, final Time timeout) { + this.slotPool = slotPool; + this.timeout = timeout; + } + + @Override + public Future allocateSlot(ScheduledUnit task, + boolean allowQueued) throws NoResourceAvailableException + { + checkNotNull(task); + + final JobID jobID = task.getTaskToExecute().getVertex().getJobId(); + final Future future = slotPool.allocateSimpleSlot(jobID, Resou
[5/5] flink git commit: [FLINK-4351] [cluster management] JobManager handle TaskManager's registration
[FLINK-4351] [cluster management] JobManager handle TaskManager's registration Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a19cae3b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a19cae3b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a19cae3b Branch: refs/heads/flip-6 Commit: a19cae3b07963776c07c0aae7bee806004f59429 Parents: e91b82d Author: Kurt Young Authored: Sun Oct 16 23:00:57 2016 +0800 Committer: Stephan Ewen Committed: Sun Oct 16 22:14:41 2016 +0200 -- .../flink/runtime/jobmaster/JobMaster.java | 75 +--- .../runtime/jobmaster/JobMasterGateway.java | 15 ++-- .../runtime/taskexecutor/JobLeaderService.java | 55 ++ .../runtime/taskexecutor/TaskExecutor.java | 2 +- .../taskexecutor/TaskManagerServices.java | 2 +- .../runtime/taskexecutor/TaskExecutorTest.java | 10 ++- 6 files changed, 102 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a19cae3b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 05c20d3..8cb9946 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; @@ -36,7 +37,9 @@ import org.apache.flink.runtime.client.SerializedJobExecutionResult; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.executiongraph.Execution; @@ -81,7 +84,9 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.StartStoppable; import org.apache.flink.runtime.state.CheckpointStateHandles; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.SerializedValue; @@ -89,8 +94,10 @@ import org.slf4j.Logger; import javax.annotation.Nullable; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -122,6 +129,8 @@ public class JobMaster extends RpcEndpoint { /** Configuration of the JobManager */ private final Configuration configuration; + private final Time rpcTimeout; + /** Service to contend for and retrieve the leadership of JM and RM */ private final HighAvailabilityServices highAvailabilityServices; @@ -152,7 +161,7 @@ public class JobMaster extends RpcEndpoint { private volatile UUID leaderSessionID; - // - resource manager + // - ResourceManager /** Leader retriever service used to locate ResourceManager's address */ private LeaderRetrievalService resourceManagerLeaderRetriever; @@ -160,6 +169,9 @@ public class JobMaster extends RpcEndpoint { /** Connection with ResourceManager, null if not located address yet or we close it initiative */ private ResourceManagerConnection resourceManagerConnection; + // - TaskManagers + + private final Map> registeredTaskManagers; // @@ -181,6 +193,7 @@ public class JobMaster extends RpcEndpoint { this.jobGraph = checkNo
[3/5] flink git commit: [FLINK-4836] [cluster management] Add flink mini cluster (part 1)
[FLINK-4836] [cluster management] Add flink mini cluster (part 1) This implements - mini cluster configuration - startup / shutdown of common services (rpc, ha) - startup / shutdown of JobManager and Dispatcher Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/655722a2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/655722a2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/655722a2 Branch: refs/heads/flip-6 Commit: 655722a29edcca261653f54fa07fb7b4c94602a6 Parents: 7947662 Author: Stephan Ewen Authored: Sat Oct 15 00:25:41 2016 +0200 Committer: Stephan Ewen Committed: Sun Oct 16 22:14:36 2016 +0200 -- .../org/apache/flink/util/ExceptionUtils.java | 62 ++- .../HighAvailabilityServicesUtils.java | 17 + .../highavailability/ZookeeperHaServices.java | 2 +- .../runtime/jobmaster/JobManagerRunner.java | 1 - .../jobmaster/MiniClusterJobDispatcher.java | 394 - .../flink/runtime/minicluster/MiniCluster.java | 406 ++ .../minicluster/MiniClusterConfiguration.java | 147 +++ .../minicluster/MiniClusterJobDispatcher.java | 418 +++ .../resourcemanager/ResourceManager.java| 2 +- .../runtime/taskexecutor/JobLeaderService.java | 3 +- .../TestingHighAvailabilityServices.java| 2 +- .../runtime/minicluster/MiniClusterITCase.java | 79 12 files changed, 1126 insertions(+), 407 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/655722a2/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index 0f6f24f..d34b236 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -26,10 +26,13 @@ package org.apache.flink.util; import org.apache.flink.annotation.Internal; +import javax.annotation.Nullable; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import static org.apache.flink.util.Preconditions.checkNotNull; + @Internal public final class ExceptionUtils { @@ -58,7 +61,54 @@ public final class ExceptionUtils { return e.getClass().getName() + " (error while printing stack trace)"; } } - + + /** +* Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception} +* to a prior exception, or returns the new exception, if no prior exception exists. +* +* {@code +* +* public void closeAllThings() throws Exception { +* Exception ex = null; +* try { +* component.shutdown(); +* } catch (Exception e) { +* ex = firstOrSuppressed(e, ex); +* } +* try { +* anotherComponent.stop(); +* } catch (Exception e) { +* ex = firstOrSuppressed(e, ex); +* } +* try { +* lastComponent.shutdown(); +* } catch (Exception e) { +* ex = firstOrSuppressed(e, ex); +* } +* +* if (ex != null) { +* throw ex; +* } +* } +* } +* +* @param newException The newly occurred exception +* @param previous The previously occurred exception, possibly null. +* +* @return The new exception, if no previous exception exists, or the previous exception with the +* new exception in the list of suppressed exceptions. +*/ + public static T firstOrSuppressed(T newException, @Nullable T previous) { + checkNotNull(newException, "newException"); + + if (previous == null) { + return newException; + } else { + previous.addSuppressed(newException); + return previous; + } + } + /** * Throws the given {@code Throwable} in scenarios where the signatures do not allow you to * throw an arbitrary Throwable. Errors and RuntimeExceptions are thrown directly, other exceptions @@ -161,10 +211,8 @@ public final class ExceptionUtils { } } - /** -* Private constructor to prevent instantiation. -*/ - private ExceptionUtils() { - throw new RuntimeException(); - } + // + +
[2/5] flink git commit: [FLINK-4835] [cluster management] Add embedded version of the high-availability services
[FLINK-4835] [cluster management] Add embedded version of the high-availability services This includes the addition of the EmbeddedLeaderService and a clean shutdown hook for all high availability services. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79476624 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79476624 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79476624 Branch: refs/heads/flip-6 Commit: 7947662467d102edc675fde19948b7638a044343 Parents: da16b0a Author: Stephan Ewen Authored: Fri Oct 14 23:57:11 2016 +0200 Committer: Stephan Ewen Committed: Sun Oct 16 22:09:39 2016 +0200 -- .../StandaloneCheckpointRecoveryFactory.java| 4 +- .../highavailability/EmbeddedNonHaServices.java | 62 +++ .../HighAvailabilityServices.java | 7 +- .../runtime/highavailability/NonHaServices.java | 62 +-- .../highavailability/ZookeeperHaServices.java | 12 +- .../nonha/AbstractNonHaServices.java| 175 +++ .../nonha/EmbeddedLeaderService.java| 466 +++ .../TestingHighAvailabilityServices.java| 9 + 8 files changed, 736 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java index a9624fb..57785ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java @@ -40,8 +40,8 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa public CompletedCheckpointStore createCheckpointStore(JobID jobId, ClassLoader userClassLoader) throws Exception { - return new StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory - .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN); + return new StandaloneCompletedCheckpointStore( + CheckpointRecoveryFactory.NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/79476624/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java new file mode 100644 index 000..58da287 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/EmbeddedNonHaServices.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.runtime.highavailability.nonha.AbstractNonHaServices; +import org.apache.flink.runtime.highavailability.nonha.EmbeddedLeaderService; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; + +/** + * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case + * where all participants (ResourceManager, JobManagers, TaskManagers) run in the same process. + * + * This implementation has no dependencies on any external services. It returns a fix + * pre-configured ResourceManager, and stores checkpoints and metadata simply on the heap or + * on a local file system and therefore in a storage without guarantees. + */ +public class Embedded
[1/5] flink git commit: [hotfix] [tests] Migrate some test tasks to Java
Repository: flink Updated Branches: refs/heads/flip-6 2486d3787 -> a19cae3b0 [hotfix] [tests] Migrate some test tasks to Java Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da16b0a7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da16b0a7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da16b0a7 Branch: refs/heads/flip-6 Commit: da16b0a7b1bf5d771d07428ae485048ce540bbf7 Parents: 2486d37 Author: Stephan Ewen Authored: Fri Oct 14 23:54:29 2016 +0200 Committer: Stephan Ewen Committed: Sun Oct 16 22:01:32 2016 +0200 -- .../StackTraceSampleCoordinatorITCase.java | 8 ++-- .../checkpoint/CoordinatorShutdownTest.java | 8 ++-- .../ExecutionGraphMetricsTest.java | 4 +- .../ExecutionGraphRestartTest.java | 23 +-- .../runtime/jobmanager/JobManagerTest.java | 9 +++-- .../flink/runtime/jobmanager/JobSubmitTest.java | 9 ++--- .../runtime/taskmanager/TaskManagerTest.java| 41 ++-- .../testtasks/BlockingNoOpInvokable.java| 39 +++ .../flink/runtime/testtasks/NoOpInvokable.java | 30 ++ .../runtime/testtasks/WaitingNoOpInvokable.java | 34 .../TaskManagerLossFailsTasksTest.scala | 3 +- .../runtime/jobmanager/JobManagerITCase.scala | 1 + .../apache/flink/runtime/jobmanager/Tasks.scala | 20 -- .../JobSubmissionFailsITCase.java | 6 +-- .../JobManagerHACheckpointRecoveryITCase.java | 4 +- .../JobManagerHAJobGraphRecoveryITCase.java | 4 +- .../jobmanager/JobManagerFailsITCase.scala | 2 +- .../taskmanager/TaskManagerFailsITCase.scala| 3 +- 18 files changed, 170 insertions(+), 78 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java -- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java index 9b1f608..d74af08 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; -import org.apache.flink.api.common.ExecutionConfig; + import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -31,14 +31,16 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; + import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -81,7 +83,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger { final int parallelism = 1; final JobVertex task = new JobVertex("Task"); - task.setInvokableClass(Tasks.BlockingNoOpInvokable.class); + task.setInvokableClass(BlockingNoOpInvokable.class); task.setParallelism(parallelism); jobGraph.addVertex(task); http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index ea4d322..a346a80 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -28,11 +28,13 @@ import org.apache.flink.runtime.jobgraph.Job
flink git commit: [FLINK-4709] [core] Fix resource leak in InputStreamFSInputWrapper
Repository: flink Updated Branches: refs/heads/release-1.1 9f7269808 -> fe464b424 [FLINK-4709] [core] Fix resource leak in InputStreamFSInputWrapper This closes #2581 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe464b42 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe464b42 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe464b42 Branch: refs/heads/release-1.1 Commit: fe464b42483399a5dee5116699f600c551c37db9 Parents: 9f72698 Author: Holger Frydrych Authored: Mon Oct 3 14:34:19 2016 +0200 Committer: Stephan Ewen Committed: Sun Oct 16 19:11:11 2016 +0200 -- .../common/io/InputStreamFSInputWrapper.java| 5 +++ .../io/InputStreamFSInputWrapperTest.java | 38 2 files changed, 43 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/fe464b42/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java index cfd94bc..f7db680 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java @@ -42,6 +42,11 @@ public class InputStreamFSInputWrapper extends FSDataInputStream { } @Override + public void close() throws IOException { + this.inStream.close(); + } + + @Override public void seek(long desired) throws IOException { if (desired < this.pos) { throw new IllegalArgumentException("Wrapped InputStream: cannot search backwards."); http://git-wip-us.apache.org/repos/asf/flink/blob/fe464b42/flink-core/src/test/java/org/apache/flink/api/common/io/InputStreamFSInputWrapperTest.java -- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/InputStreamFSInputWrapperTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/InputStreamFSInputWrapperTest.java new file mode 100644 index 000..8fcd231 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/InputStreamFSInputWrapperTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.io; + +import java.io.InputStream; + +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class InputStreamFSInputWrapperTest { + + @Test + public void testClose() throws Exception { + InputStream mockedInputStream = mock(InputStream.class); + InputStreamFSInputWrapper wrapper = new InputStreamFSInputWrapper(mockedInputStream); + wrapper.close(); + verify(mockedInputStream).close(); + } + +}
[1/2] flink git commit: [FLINK-4360] [tm] Implement TM -> JM registration logic [Forced Update!]
Repository: flink Updated Branches: refs/heads/flip-6 928800569 -> 2486d3787 (forced update) http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java new file mode 100644 index 000..66d8102 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.exceptions; + +/** + * Exception indicating that the slot allocation on the task manager failed. + */ +public class SlotAllocationException extends TaskManagerException { + + private static final long serialVersionUID = -4764932098204266773L; + + public SlotAllocationException(String message) { + super(message); + } + + public SlotAllocationException(String message, Throwable cause) { + super(message, cause); + } + + public SlotAllocationException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index 42cb919..88123b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -70,10 +70,7 @@ public class TaskSlotTable implements TimeoutListener { /** Interface for slot actions, such as freeing them or timing them out */ private SlotActions slotActions; - - /** The timeout for allocated slots */ - private Time slotTimeout; - + /** Whether the table has been started */ private boolean started; @@ -104,7 +101,6 @@ public class TaskSlotTable implements TimeoutListener { slotsPerJob = new HashMap<>(4); slotActions = null; - slotTimeout = null; started = false; } @@ -112,11 +108,9 @@ public class TaskSlotTable implements TimeoutListener { * Start the task slot table with the given slot actions and slot timeout value. * * @param initialSlotActions to use for slot actions -* @param initialSlotTimeout to use for slot timeouts */ - public void start(SlotActions initialSlotActions, Time initialSlotTimeout) { + public void start(SlotActions initialSlotActions) { this.slotActions = Preconditions.checkNotNull(initialSlotActions); - this.slotTimeout = Preconditions.checkNotNull(initialSlotTimeout); timerService.start(this); @@ -129,7 +123,6 @@ public class TaskSlotTable implements TimeoutListener { public void stop() { started = false; timerService.stop(); - slotTimeout = null; slotActions = null; } @@ -144,9 +137,10 @@ public class TaskSlotTable implements TimeoutListener { * @param index of the task slot to allocate * @param jobId to allocate the task slot for * @param allocationId identifying the allocation +* @param slotTimeout until the slot times out * @return True if the task slot could be allocated; otherwise false */ - public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId) { + public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) {
[2/2] flink git commit: [FLINK-4360] [tm] Implement TM -> JM registration logic
[FLINK-4360] [tm] Implement TM -> JM registration logic Upon requesting a slot for a new job, the TaskManager registers this job at the JobLeaderService. The job leader service is responsible to monitor job leader changes for all registered jobs. In case of a new job leader, the service will try to establish a connection to the new job leader. Upon establishing the connection the task manager is informed about it. The task manager will then offer all allocated but not yet active slots to the new job leader. Implement JobLeaderService The JobLeaderService is responsible for establishing a connection to the JM leader of a given job. Disable TaskExecutorTest#testRejectAllocationRequestsForOutOfSyncSlots Add simple task submission test Add job leader detection test case Add task slot acceptance test Fix RpcCompletenessTest Add comments This closes #2640. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2486d378 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2486d378 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2486d378 Branch: refs/heads/flip-6 Commit: 2486d3787b0d45a6220d3dcc96ca665986c47837 Parents: 9da76dc Author: Till Rohrmann Authored: Wed Oct 5 17:02:06 2016 +0200 Committer: Till Rohrmann Committed: Sun Oct 16 10:47:40 2016 +0200 -- .../org/apache/flink/util/ReflectionUtil.java | 110 .../deployment/TaskDeploymentDescriptor.java| 11 +- .../runtime/executiongraph/ExecutionVertex.java | 2 + .../HighAvailabilityServices.java | 3 +- .../runtime/highavailability/NonHaServices.java | 4 +- .../highavailability/ZookeeperHaServices.java | 2 +- .../jobmaster/JMTMRegistrationSuccess.java | 45 ++ .../flink/runtime/jobmaster/JobMaster.java | 19 + .../runtime/jobmaster/JobMasterGateway.java | 36 ++ .../registration/RegisteredRpcConnection.java | 2 +- .../resourcemanager/ResourceManager.java| 2 +- .../slotmanager/SlotManager.java| 8 +- .../runtime/taskexecutor/JobLeaderListener.java | 60 +++ .../runtime/taskexecutor/JobLeaderService.java | 390 ++ .../taskexecutor/JobManagerConnection.java | 23 +- .../runtime/taskexecutor/JobManagerTable.java | 59 +++ .../runtime/taskexecutor/TaskExecutor.java | 522 ++- .../taskexecutor/TaskExecutorGateway.java | 25 +- ...TaskExecutorToResourceManagerConnection.java | 5 + .../runtime/taskexecutor/TaskManagerRunner.java | 2 + .../taskexecutor/TaskManagerServices.java | 24 +- .../exceptions/SlotAllocationException.java | 39 ++ .../taskexecutor/slot/TaskSlotTable.java| 39 +- .../apache/flink/runtime/taskmanager/Task.java | 2 +- .../TaskDeploymentDescriptorTest.java | 6 +- .../TestingHighAvailabilityServices.java| 2 +- .../metrics/groups/TaskManagerGroupTest.java| 10 +- .../slotmanager/SlotManagerTest.java| 2 +- .../slotmanager/SlotProtocolTest.java | 8 +- .../flink/runtime/rpc/RpcCompletenessTest.java | 29 +- .../runtime/taskexecutor/TaskExecutorTest.java | 431 ++- .../runtime/taskmanager/TaskAsyncCallTest.java | 3 +- .../runtime/taskmanager/TaskManagerTest.java| 75 +-- .../flink/runtime/taskmanager/TaskStopTest.java | 2 +- .../flink/runtime/taskmanager/TaskTest.java | 23 +- .../tasks/InterruptSensitiveRestoreTest.java| 38 +- .../streaming/runtime/tasks/StreamTaskTest.java | 25 +- 37 files changed, 1805 insertions(+), 283 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2486d378/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java index b851eba..2883570 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java @@ -23,6 +23,9 @@ import org.apache.flink.annotation.Internal; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; @Internal public final class ReflectionUtil { @@ -151,6 +154,113 @@ public final class ReflectionUtil { } /** +* Extract the full template type information from the given type's template parameter at the +* given position. +* +* @param type type to extract the full template parameter information from +* @param templatePosition describing at which position the template type parameter is +* @return Full type information de
[1/2] flink git commit: [FLINK-4343] [tm] Implement TM -> JM registration logic
Repository: flink Updated Branches: refs/heads/flip-6 9da76dcfd -> 928800569 http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java new file mode 100644 index 000..66d8102 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.exceptions; + +/** + * Exception indicating that the slot allocation on the task manager failed. + */ +public class SlotAllocationException extends TaskManagerException { + + private static final long serialVersionUID = -4764932098204266773L; + + public SlotAllocationException(String message) { + super(message); + } + + public SlotAllocationException(String message, Throwable cause) { + super(message, cause); + } + + public SlotAllocationException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index 42cb919..88123b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -70,10 +70,7 @@ public class TaskSlotTable implements TimeoutListener { /** Interface for slot actions, such as freeing them or timing them out */ private SlotActions slotActions; - - /** The timeout for allocated slots */ - private Time slotTimeout; - + /** Whether the table has been started */ private boolean started; @@ -104,7 +101,6 @@ public class TaskSlotTable implements TimeoutListener { slotsPerJob = new HashMap<>(4); slotActions = null; - slotTimeout = null; started = false; } @@ -112,11 +108,9 @@ public class TaskSlotTable implements TimeoutListener { * Start the task slot table with the given slot actions and slot timeout value. * * @param initialSlotActions to use for slot actions -* @param initialSlotTimeout to use for slot timeouts */ - public void start(SlotActions initialSlotActions, Time initialSlotTimeout) { + public void start(SlotActions initialSlotActions) { this.slotActions = Preconditions.checkNotNull(initialSlotActions); - this.slotTimeout = Preconditions.checkNotNull(initialSlotTimeout); timerService.start(this); @@ -129,7 +123,6 @@ public class TaskSlotTable implements TimeoutListener { public void stop() { started = false; timerService.stop(); - slotTimeout = null; slotActions = null; } @@ -144,9 +137,10 @@ public class TaskSlotTable implements TimeoutListener { * @param index of the task slot to allocate * @param jobId to allocate the task slot for * @param allocationId identifying the allocation +* @param slotTimeout until the slot times out * @return True if the task slot could be allocated; otherwise false */ - public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId) { + public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) { checkInit(
[2/2] flink git commit: [FLINK-4343] [tm] Implement TM -> JM registration logic
[FLINK-4343] [tm] Implement TM -> JM registration logic Upon requesting a slot for a new job, the TaskManager registers this job at the JobLeaderService. The job leader service is responsible to monitor job leader changes for all registered jobs. In case of a new job leader, the service will try to establish a connection to the new job leader. Upon establishing the connection the task manager is informed about it. The task manager will then offer all allocated but not yet active slots to the new job leader. Implement JobLeaderService The JobLeaderService is responsible for establishing a connection to the JM leader of a given job. Disable TaskExecutorTest#testRejectAllocationRequestsForOutOfSyncSlots Add simple task submission test Add job leader detection test case Add task slot acceptance test Fix RpcCompletenessTest Add comments This closes #2640. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92880056 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92880056 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92880056 Branch: refs/heads/flip-6 Commit: 928800569234156876b9744de064cc359f121664 Parents: 9da76dc Author: Till Rohrmann Authored: Wed Oct 5 17:02:06 2016 +0200 Committer: Till Rohrmann Committed: Sun Oct 16 10:46:09 2016 +0200 -- .../org/apache/flink/util/ReflectionUtil.java | 110 .../deployment/TaskDeploymentDescriptor.java| 11 +- .../runtime/executiongraph/ExecutionVertex.java | 2 + .../HighAvailabilityServices.java | 3 +- .../runtime/highavailability/NonHaServices.java | 4 +- .../highavailability/ZookeeperHaServices.java | 2 +- .../jobmaster/JMTMRegistrationSuccess.java | 45 ++ .../flink/runtime/jobmaster/JobMaster.java | 19 + .../runtime/jobmaster/JobMasterGateway.java | 36 ++ .../registration/RegisteredRpcConnection.java | 2 +- .../resourcemanager/ResourceManager.java| 2 +- .../slotmanager/SlotManager.java| 8 +- .../runtime/taskexecutor/JobLeaderListener.java | 60 +++ .../runtime/taskexecutor/JobLeaderService.java | 390 ++ .../taskexecutor/JobManagerConnection.java | 23 +- .../runtime/taskexecutor/JobManagerTable.java | 59 +++ .../runtime/taskexecutor/TaskExecutor.java | 522 ++- .../taskexecutor/TaskExecutorGateway.java | 25 +- ...TaskExecutorToResourceManagerConnection.java | 5 + .../runtime/taskexecutor/TaskManagerRunner.java | 2 + .../taskexecutor/TaskManagerServices.java | 24 +- .../exceptions/SlotAllocationException.java | 39 ++ .../taskexecutor/slot/TaskSlotTable.java| 39 +- .../apache/flink/runtime/taskmanager/Task.java | 2 +- .../TaskDeploymentDescriptorTest.java | 6 +- .../TestingHighAvailabilityServices.java| 2 +- .../metrics/groups/TaskManagerGroupTest.java| 10 +- .../slotmanager/SlotManagerTest.java| 2 +- .../slotmanager/SlotProtocolTest.java | 8 +- .../flink/runtime/rpc/RpcCompletenessTest.java | 29 +- .../runtime/taskexecutor/TaskExecutorTest.java | 431 ++- .../runtime/taskmanager/TaskAsyncCallTest.java | 3 +- .../runtime/taskmanager/TaskManagerTest.java| 75 +-- .../flink/runtime/taskmanager/TaskStopTest.java | 2 +- .../flink/runtime/taskmanager/TaskTest.java | 23 +- .../tasks/InterruptSensitiveRestoreTest.java| 38 +- .../streaming/runtime/tasks/StreamTaskTest.java | 25 +- 37 files changed, 1805 insertions(+), 283 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java index b851eba..2883570 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java @@ -23,6 +23,9 @@ import org.apache.flink.annotation.Internal; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; @Internal public final class ReflectionUtil { @@ -151,6 +154,113 @@ public final class ReflectionUtil { } /** +* Extract the full template type information from the given type's template parameter at the +* given position. +* +* @param type type to extract the full template parameter information from +* @param templatePosition describing at which position the template type parameter is +* @return Full type information de