[FLINK-4373] [cluster management] Introduce SlotID, AllocationID, ResourceProfile
[FLINK-4373] [cluster management] address comments This closes #2370. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9159ad6e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9159ad6e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9159ad6e Branch: refs/heads/flip-6 Commit: 9159ad6eafbba3d6f5804e62a9f4209d803da123 Parents: 433a1fd Author: Kurt Young <ykt...@gmail.com> Authored: Fri Aug 12 11:05:48 2016 +0800 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Wed Sep 21 11:39:14 2016 +0200 ---------------------------------------------------------------------- .../clusterframework/types/AllocationID.java | 32 ++++++++ .../clusterframework/types/ResourceProfile.java | 68 ++++++++++++++++ .../runtime/clusterframework/types/SlotID.java | 83 ++++++++++++++++++++ .../types/ResourceProfileTest.java | 49 ++++++++++++ 4 files changed, 232 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9159ad6e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java new file mode 100644 index 0000000..f7ae6ee --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/AllocationID.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import org.apache.flink.util.AbstractID; + +/** + * Unique identifier for the attempt to allocate a slot, normally created by JobManager when requesting a slot, + * constant across re-tries. This can also be used to identify responses by the ResourceManager and to identify + * deployment calls towards the TaskManager that was allocated from. + */ +public class AllocationID extends AbstractID { + + private static final long serialVersionUID = 1L; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9159ad6e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java new file mode 100644 index 0000000..cbe709f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import java.io.Serializable; + +/** + * Describe the resource profile of the slot, either when requiring or offering it. The profile can be + * checked whether it can match another profile's requirement, and furthermore we may calculate a matching + * score to decide which profile we should choose when we have lots of candidate slots. + */ +public class ResourceProfile implements Serializable { + + private static final long serialVersionUID = -784900073893060124L; + + /** How many cpu cores are needed, use double so we can specify cpu like 0.1 */ + private final double cpuCores; + + /** How many memory in mb are needed */ + private final long memoryInMB; + + public ResourceProfile(double cpuCores, long memoryInMB) { + this.cpuCores = cpuCores; + this.memoryInMB = memoryInMB; + } + + /** + * Get the cpu cores needed + * @return The cpu cores, 1.0 means a full cpu thread + */ + public double getCpuCores() { + return cpuCores; + } + + /** + * Get the memory needed in MB + * @return The memory in MB + */ + public long getMemoryInMB() { + return memoryInMB; + } + + /** + * Check whether required resource profile can be matched + * + * @param required the required resource profile + * @return true if the requirement is matched, otherwise false + */ + public boolean isMatching(ResourceProfile required) { + return Double.compare(cpuCores, required.getCpuCores()) >= 0 && memoryInMB >= required.getMemoryInMB(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9159ad6e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java new file mode 100644 index 0000000..d1b072d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Unique identifier for a slot which located in TaskManager. + */ +public class SlotID implements ResourceIDRetrievable, Serializable { + + private static final long serialVersionUID = -6399206032549807771L; + + /** The resource id which this slot located */ + private final ResourceID resourceId; + + /** The numeric id for single slot */ + private final int slotId; + + public SlotID(ResourceID resourceId, int slotId) { + this.resourceId = checkNotNull(resourceId, "ResourceID must not be null"); + this.slotId = slotId; + } + + // ------------------------------------------------------------------------ + + @Override + public ResourceID getResourceID() { + return resourceId; + } + + // ------------------------------------------------------------------------ + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SlotID slotID = (SlotID) o; + + if (slotId != slotID.slotId) { + return false; + } + return resourceId.equals(slotID.resourceId); + } + + @Override + public int hashCode() { + int result = resourceId.hashCode(); + result = 31 * result + slotId; + return result; + } + + @Override + public String toString() { + return "SlotID{" + + "resourceId=" + resourceId + + ", slotId=" + slotId + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9159ad6e/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java new file mode 100644 index 0000000..bc5ddaa --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework.types; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ResourceProfileTest { + + @Test + public void testMatchRequirement() throws Exception { + ResourceProfile rp1 = new ResourceProfile(1.0, 100); + ResourceProfile rp2 = new ResourceProfile(1.0, 200); + ResourceProfile rp3 = new ResourceProfile(2.0, 100); + ResourceProfile rp4 = new ResourceProfile(2.0, 200); + + assertFalse(rp1.isMatching(rp2)); + assertTrue(rp2.isMatching(rp1)); + + assertFalse(rp1.isMatching(rp3)); + assertTrue(rp3.isMatching(rp1)); + + assertFalse(rp2.isMatching(rp3)); + assertFalse(rp3.isMatching(rp2)); + + assertTrue(rp4.isMatching(rp1)); + assertTrue(rp4.isMatching(rp2)); + assertTrue(rp4.isMatching(rp3)); + assertTrue(rp4.isMatching(rp4)); + } +}