buildbot success in on flink-docs-release-0.10
The Buildbot has detected a restored build on builder flink-docs-release-0.10 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-0.10/builds/318 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave1_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' triggered this build Build Source Stamp: [branch release-0.10] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
buildbot success in on flink-docs-release-1.1
The Buildbot has detected a restored build on builder flink-docs-release-1.1 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-1.1/builds/34 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave1_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-1.1' triggered this build Build Source Stamp: [branch release-1.1] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
buildbot failure in on flink-docs-master
The Buildbot has detected a new failure on builder flink-docs-master while building . Full details are available at: https://ci.apache.org/builders/flink-docs-master/builds/443 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave2_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered this build Build Source Stamp: [branch master] HEAD Blamelist: BUILD FAILED: failed compile Sincerely, -The Buildbot
flink git commit: [hotfix] [tests] Fix CassandraConnectorITCase on Java 7 profiles
Repository: flink Updated Branches: refs/heads/master cb89ba19b -> e1443d702 [hotfix] [tests] Fix CassandraConnectorITCase on Java 7 profiles Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1443d70 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1443d70 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1443d70 Branch: refs/heads/master Commit: e1443d7025709ac5a9a13bc73e6d7dcec1ba729a Parents: cb89ba1 Author: Stephan EwenAuthored: Thu Sep 1 11:54:18 2016 +0200 Committer: Stephan Ewen Committed: Thu Sep 1 11:54:18 2016 +0200 -- .../connectors/cassandra/CassandraConnectorITCase.java | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e1443d70/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java -- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 4822f91..8bb440c 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -211,7 +211,10 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase
flink git commit: [FLINK-4528] [rpc] Marks main thread execution methods in RpcEndpoint as protected
Repository: flink Updated Branches: refs/heads/flip-6 11a92d1da -> 074f214e0 [FLINK-4528] [rpc] Marks main thread execution methods in RpcEndpoint as protected Give main thread execution context into the TaskExecutorToResourceManagerConnection Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/074f214e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/074f214e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/074f214e Branch: refs/heads/flip-6 Commit: 074f214e0cf67b86d0d879a93a18f6cc86acfa9f Parents: 11a92d1 Author: Till RohrmannAuthored: Mon Aug 29 15:49:59 2016 +0200 Committer: Till Rohrmann Committed: Thu Sep 1 12:24:05 2016 +0200 -- .../apache/flink/runtime/rpc/RpcEndpoint.java | 8 +- .../runtime/taskexecutor/TaskExecutor.java | 7 +- ...TaskExecutorToResourceManagerConnection.java | 26 ++- .../flink/runtime/rpc/AsyncCallsTest.java | 216 ++ .../flink/runtime/rpc/akka/AsyncCallsTest.java | 219 --- 5 files changed, 242 insertions(+), 234 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/074f214e/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 7b3f8a1..e9e2b2c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -161,7 +161,7 @@ public abstract class RpcEndpoint { * * @return Main thread execution context */ - public ExecutionContext getMainThreadExecutionContext() { + protected ExecutionContext getMainThreadExecutionContext() { return mainThreadExecutionContext; } @@ -184,7 +184,7 @@ public abstract class RpcEndpoint { * * @param runnable Runnable to be executed in the main thread of the underlying RPC endpoint */ - public void runAsync(Runnable runnable) { + protected void runAsync(Runnable runnable) { ((MainThreadExecutor) self).runAsync(runnable); } @@ -195,7 +195,7 @@ public abstract class RpcEndpoint { * @param runnable Runnable to be executed * @param delayThe delay after which the runnable will be executed */ - public void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) { + protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) { ((MainThreadExecutor) self).scheduleRunAsync(runnable, unit.toMillis(delay)); } @@ -209,7 +209,7 @@ public abstract class RpcEndpoint { * @param Return type of the callable * @return Future for the result of the callable. */ - public Future callAsync(Callable callable, Timeout timeout) { + protected Future callAsync(Callable callable, Timeout timeout) { return ((MainThreadExecutor) self).callAsync(callable, timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/074f214e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 4871b96..735730b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -176,7 +176,12 @@ public class TaskExecutor extends RpcEndpoint { if (newLeaderAddress != null) { log.info("Attempting to register at ResourceManager {}", newLeaderAddress); resourceManagerConnection = - new TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, newLeaderId); + new TaskExecutorToResourceManagerConnection( + log, + this, + newLeaderAddress, + newLeaderId, + getMainThreadExecutionContext()); resourceManagerConnection.start(); } }
[6/6] flink git commit: [FLINK-4529] [flip-6] Move TaskExecutor, JobMaster and ResourceManager out of the rpc package
[FLINK-4529] [flip-6] Move TaskExecutor, JobMaster and ResourceManager out of the rpc package The TaskExecutor, the JobMaster and the ResourceManager were still contained in the rpc package. With this commit, they will be moved out of this package. Now they are contained in dedicated packages on the o.a.f.runtime level. This closes #2438. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11a92d1d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11a92d1d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11a92d1d Branch: refs/heads/flip-6 Commit: 11a92d1da958cfd2c079a550bcac62c0e6eeb6b3 Parents: 454bf51 Author: Till RohrmannAuthored: Mon Aug 29 16:35:29 2016 +0200 Committer: Till Rohrmann Committed: Thu Sep 1 12:13:26 2016 +0200 -- .../runtime/clusterframework/SlotManager.java | 525 .../flink/runtime/jobmaster/JobMaster.java | 244 ++ .../runtime/jobmaster/JobMasterGateway.java | 45 + .../registration/RegistrationResponse.java | 84 ++ .../registration/RetryingRegistration.java | 296 +++ .../resourcemanager/JobMasterRegistration.java | 35 + .../resourcemanager/RegistrationResponse.java | 43 + .../resourcemanager/ResourceManager.java| 214 + .../resourcemanager/ResourceManagerGateway.java | 77 ++ .../runtime/resourcemanager/SlotAssignment.java | 25 + .../runtime/resourcemanager/SlotManager.java| 523 .../runtime/resourcemanager/SlotRequest.java| 74 ++ .../flink/runtime/rpc/jobmaster/JobMaster.java | 244 -- .../runtime/rpc/jobmaster/JobMasterGateway.java | 45 - .../rpc/registration/RegistrationResponse.java | 84 -- .../rpc/registration/RetryingRegistration.java | 296 --- .../resourcemanager/JobMasterRegistration.java | 35 - .../resourcemanager/RegistrationResponse.java | 43 - .../rpc/resourcemanager/ResourceManager.java| 214 - .../resourcemanager/ResourceManagerGateway.java | 77 -- .../rpc/resourcemanager/SlotAssignment.java | 25 - .../rpc/resourcemanager/SlotRequest.java| 74 -- .../runtime/rpc/taskexecutor/SlotReport.java| 56 -- .../runtime/rpc/taskexecutor/SlotStatus.java| 129 --- .../runtime/rpc/taskexecutor/TaskExecutor.java | 827 --- .../taskexecutor/TaskExecutorConfiguration.java | 151 .../rpc/taskexecutor/TaskExecutorGateway.java | 35 - .../TaskExecutorRegistrationSuccess.java| 75 -- ...TaskExecutorToResourceManagerConnection.java | 198 - .../flink/runtime/taskexecutor/SlotReport.java | 56 ++ .../flink/runtime/taskexecutor/SlotStatus.java | 129 +++ .../runtime/taskexecutor/TaskExecutor.java | 827 +++ .../taskexecutor/TaskExecutorConfiguration.java | 151 .../taskexecutor/TaskExecutorGateway.java | 35 + .../TaskExecutorRegistrationSuccess.java| 75 ++ ...TaskExecutorToResourceManagerConnection.java | 198 + .../clusterframework/ClusterShutdownITCase.java | 156 .../clusterframework/ResourceManagerITCase.java | 157 .../clusterframework/ResourceManagerTest.java | 338 .../clusterframework/SlotManagerTest.java | 540 .../registration/RetryingRegistrationTest.java | 336 .../registration/TestRegistrationGateway.java | 85 ++ .../resourcemanager/ClusterShutdownITCase.java | 156 .../resourcemanager/ResourceManagerHATest.java | 76 ++ .../resourcemanager/ResourceManagerITCase.java | 157 .../resourcemanager/ResourceManagerTest.java| 338 .../resourcemanager/SlotManagerTest.java| 538 .../runtime/rpc/akka/AkkaRpcServiceTest.java| 14 - .../registration/RetryingRegistrationTest.java | 336 .../registration/TestRegistrationGateway.java | 85 -- .../resourcemanager/ResourceManagerHATest.java | 76 -- .../rpc/taskexecutor/TaskExecutorTest.java | 117 --- .../runtime/taskexecutor/TaskExecutorTest.java | 117 +++ 53 files changed, 4934 insertions(+), 4952 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/11a92d1d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java deleted file mode 100644 index cc140a1..000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java +++ /dev/null @@ -1,525 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file
[2/6] flink git commit: [FLINK-4529] [flip-6] Move TaskExecutor, JobMaster and ResourceManager out of the rpc package
http://git-wip-us.apache.org/repos/asf/flink/blob/11a92d1d/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java deleted file mode 100644 index 2ee280f..000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java +++ /dev/null @@ -1,540 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.clusterframework; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.ResourceSlot; -import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; -import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest; -import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus; -import org.junit.Before; -import org.junit.Test; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -public class SlotManagerTest { - - private static final double DEFAULT_TESTING_CPU_CORES = 1.0; - - private static final long DEFAULT_TESTING_MEMORY = 512; - - private static final ResourceProfile DEFAULT_TESTING_PROFILE = - new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY); - - private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = - new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2); - - private ResourceManagerGateway resourceManagerGateway; - - @Before - public void setUp() { - resourceManagerGateway = mock(ResourceManagerGateway.class); - } - - /** -* Tests that there are no free slots when we request, need to allocate from cluster manager master -*/ - @Test - public void testRequestSlotWithoutFreeSlot() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - - assertEquals(0, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(1, slotManager.getPendingRequestCount()); - assertEquals(1, slotManager.getAllocatedContainers().size()); - assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0)); - } - - /** -* Tests that there are some free slots when we request, and the request is fulfilled immediately -*/ - @Test - public void testRequestSlotWithFreeSlot() { - TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); - - directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1); - assertEquals(1, slotManager.getFreeSlotCount()); - - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertEquals(0, slotManager.getAllocatedContainers().size()); - } - - /** -* Tests that there are some free slots when we request, but none of them are suitable -*/ - @Test - public void testRequestSlotWithoutSuitableSlot() { - TestingSlotManager
[4/6] flink git commit: [FLINK-4529] [flip-6] Move TaskExecutor, JobMaster and ResourceManager out of the rpc package
http://git-wip-us.apache.org/repos/asf/flink/blob/11a92d1d/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java deleted file mode 100644 index 36d6310..000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java +++ /dev/null @@ -1,827 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rpc.taskexecutor; - -import akka.actor.ActorSystem; -import akka.dispatch.ExecutionContexts$; -import akka.util.Timeout; -import com.typesafe.config.Config; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.memory.HeapMemorySegment; -import org.apache.flink.core.memory.HybridMemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.NetworkEnvironment; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.leaderelection.LeaderElectionService; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcMethod; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; -import org.apache.flink.runtime.taskmanager.MemoryLogger; -import org.apache.flink.runtime.util.EnvironmentInformation; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; -import org.apache.flink.util.MathUtils; -import org.apache.flink.util.NetUtils; - -import scala.Tuple2; -import scala.Option; -import scala.Some; -import scala.concurrent.ExecutionContext; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.UUID; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeUnit; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * TaskExecutor implementation. The task executor is responsible for the execution of multiple - * {@link org.apache.flink.runtime.taskmanager.Task}. - */ -public class TaskExecutor extends RpcEndpoint { - - private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); - - /** The unique resource ID of this TaskExecutor */ - private final ResourceID resourceID; - - /** The access to the leader election and metadata storage services */ - private final HighAvailabilityServices haServices; - - /** The task manager configuration */ - private final TaskExecutorConfiguration taskExecutorConfig; - - /** The I/O manager component in the task manager */ - private final IOManager ioManager; - - /** The memory manager component in the task manager */ - private final MemoryManager memoryManager; - - /** The network component in the task
[1/6] flink git commit: [FLINK-4529] [flip-6] Move TaskExecutor, JobMaster and ResourceManager out of the rpc package
Repository: flink Updated Branches: refs/heads/flip-6 454bf51b0 -> 11a92d1da http://git-wip-us.apache.org/repos/asf/flink/blob/11a92d1d/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java new file mode 100644 index 000..52d9d06 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.ResourceSlot; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.junit.Before; +import org.junit.Test; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class SlotManagerTest { + + private static final double DEFAULT_TESTING_CPU_CORES = 1.0; + + private static final long DEFAULT_TESTING_MEMORY = 512; + + private static final ResourceProfile DEFAULT_TESTING_PROFILE = + new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY); + + private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = + new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2); + + private ResourceManagerGateway resourceManagerGateway; + + @Before + public void setUp() { + resourceManagerGateway = mock(ResourceManagerGateway.class); + } + + /** +* Tests that there are no free slots when we request, need to allocate from cluster manager master +*/ + @Test + public void testRequestSlotWithoutFreeSlot() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + + assertEquals(0, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(1, slotManager.getPendingRequestCount()); + assertEquals(1, slotManager.getAllocatedContainers().size()); + assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0)); + } + + /** +* Tests that there are some free slots when we request, and the request is fulfilled immediately +*/ + @Test + public void testRequestSlotWithFreeSlot() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + + directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1); + assertEquals(1, slotManager.getFreeSlotCount()); + + slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + assertEquals(0, slotManager.getAllocatedContainers().size()); + } + + /** +* Tests that there are some free slots when we request, but none of them are suitable +*/ + @Test + public void testRequestSlotWithoutSuitableSlot() { + TestingSlotManager slotManager = new TestingSlotManager(resourceManagerGateway); + +
flink git commit: [hotfix] [test-stability] Properly fail if ZooKeeperTestEnvironment cannot delete ZNodes
Repository: flink Updated Branches: refs/heads/master 4a100fa4c -> cb89ba19b [hotfix] [test-stability] Properly fail if ZooKeeperTestEnvironment cannot delete ZNodes Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb89ba19 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb89ba19 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb89ba19 Branch: refs/heads/master Commit: cb89ba19b5ce09903498efd0ede59cea42f2eb48 Parents: 4a100fa Author: Till RohrmannAuthored: Thu Sep 1 11:41:10 2016 +0200 Committer: Till Rohrmann Committed: Thu Sep 1 11:41:10 2016 +0200 -- .../flink/runtime/zookeeper/ZooKeeperTestEnvironment.java| 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/cb89ba19/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java index 467706f..bd58515 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperTestEnvironment.java @@ -144,18 +144,22 @@ public class ZooKeeperTestEnvironment { for (int i = 0; i < maxAttempts; i++) { try { ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, false); - break; + return; } catch (org.apache.zookeeper.KeeperException.NoNodeException e) { // that seems all right. if one of the children we want to delete is // actually already deleted, that's fine. - break; + return; } catch (KeeperException.ConnectionLossException e) { // Keep retrying Thread.sleep(100); } } + + throw new Exception("Could not clear the ZNodes under " + path + ". ZooKeeper is not in " + + "a clean state."); + } }