This is an automated email from the ASF dual-hosted git repository. mmuzaf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push: new 622d38f IGNITE-17101 Move ignite-mesos to the Ignite Extensions project (#153) 622d38f is described below commit 622d38fafdfd21a3fc1bbe49df4421893fadec45 Author: Maxim Muzafarov <maxmu...@gmail.com> AuthorDate: Mon Jun 6 12:59:04 2022 +0300 IGNITE-17101 Move ignite-mesos to the Ignite Extensions project (#153) --- modules/mesos-ext/README.txt | 28 + modules/mesos-ext/licenses/apache-2.0.txt | 202 +++++++ modules/mesos-ext/pom.xml | 106 ++++ .../org/apache/ignite/mesos/ClusterProperties.java | 624 +++++++++++++++++++++ .../org/apache/ignite/mesos/IgniteFramework.java | 183 ++++++ .../org/apache/ignite/mesos/IgniteScheduler.java | 386 +++++++++++++ .../java/org/apache/ignite/mesos/IgniteTask.java | 86 +++ .../java/org/apache/ignite/mesos/package-info.java | 23 + .../ignite/mesos/resource/IgniteProvider.java | 273 +++++++++ .../apache/ignite/mesos/resource/JettyServer.java | 69 +++ .../ignite/mesos/resource/ResourceHandler.java | 146 +++++ .../ignite/mesos/resource/ResourceProvider.java | 164 ++++++ .../apache/ignite/mesos/resource/package-info.java | 23 + .../src/main/resources/ignite-default-config.xml | 35 ++ .../org/apache/ignite/IgniteMesosTestSuite.java | 32 ++ .../ignite/mesos/IgniteSchedulerSelfTest.java | 552 ++++++++++++++++++ pom.xml | 1 + 17 files changed, 2933 insertions(+) diff --git a/modules/mesos-ext/README.txt b/modules/mesos-ext/README.txt new file mode 100644 index 0000000..96143cf --- /dev/null +++ b/modules/mesos-ext/README.txt @@ -0,0 +1,28 @@ +Apache Ignite Mesos Module +------------------------ + +Apache Ignite Mesos module provides integration Apache Ignite with Apache Mesos. + +Importing Apache Ignite Mesos Module In Maven Project +------------------------------------- + +If you are using Maven to manage dependencies of your project, you can add Mesos module +dependency like this (replace '${ignite.version}' with actual Ignite version you are +interested in): + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + ... + <dependencies> + ... + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-mesos-ext</artifactId> + <version>1.0.0</version> + </dependency> + ... + </dependencies> + ... +</project> diff --git a/modules/mesos-ext/licenses/apache-2.0.txt b/modules/mesos-ext/licenses/apache-2.0.txt new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/modules/mesos-ext/licenses/apache-2.0.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/modules/mesos-ext/pom.xml b/modules/mesos-ext/pom.xml new file mode 100644 index 0000000..5b99697 --- /dev/null +++ b/modules/mesos-ext/pom.xml @@ -0,0 +1,106 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + POM file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-parent-ext-internal</artifactId> + <version>1</version> + <relativePath>../../parent-internal/pom.xml</relativePath> + </parent> + + <artifactId>ignite-mesos-ext</artifactId> + <version>1.0.0-SNAPSHOT</version> + + <url>https://ignite.apache.org</url> + + <properties> + <mesos.version>1.11.0</mesos.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.mesos</groupId> + <artifactId>mesos</artifactId> + <version>${mesos.version}</version> + </dependency> + + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-server</artifactId> + <version>${jetty.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.4.1</version> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + <archive> + <manifest> + <mainClass>org.apache.ignite.mesos.IgniteFramework</mainClass> + </manifest> + </archive> + <appendAssemblyId>false</appendAssemblyId> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <version>2.8.2</version> + <configuration> + <skip>false</skip> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/ClusterProperties.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/ClusterProperties.java new file mode 100644 index 0000000..2f158b9 --- /dev/null +++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/ClusterProperties.java @@ -0,0 +1,624 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos; + +import java.io.FileInputStream; +import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +/** + * The class defines cluster configuration. This configuration created from properties file + * that passed on startup or environment variables. + * <p> + * If Mesos cluster working in Intranet or behind NAT then an access to local resources can be set + * by {@link #ignitePackagePath()} or {@link #ignitePackageUrl()} which should be available from nodes. + */ +public class ClusterProperties { + /** */ + private static final Logger log = Logger.getLogger(ClusterProperties.class.getSimpleName()); + + /** Unlimited. */ + public static final double UNLIMITED = Double.MAX_VALUE; + + /** */ + public static final String MESOS_MASTER_URL = "MESOS_MASTER_URL"; + + /** */ + public static final String DEFAULT_MESOS_MASTER_URL = "zk://localhost:2181/mesos"; + + /** Mesos master url. */ + private String mesosUrl = DEFAULT_MESOS_MASTER_URL; + + /** */ + public static final String IGNITE_JVM_OPTS = "IGNITE_JVM_OPTS"; + + /** JVM options. */ + private String jvmOpts = ""; + + /** */ + public static final String IGNITE_CLUSTER_NAME = "IGNITE_CLUSTER_NAME"; + + /** */ + public static final String DEFAULT_CLUSTER_NAME = "ignite-cluster"; + + /** Mesos master url. */ + private String clusterName = DEFAULT_CLUSTER_NAME; + + /** */ + public static final String IGNITE_HTTP_SERVER_HOST = "IGNITE_HTTP_SERVER_HOST"; + + /** Http server host. */ + private String httpSrvHost = null; + + /** */ + public static final String IGNITE_HTTP_SERVER_PORT = "IGNITE_HTTP_SERVER_PORT"; + + /** */ + public static final String DEFAULT_HTTP_SERVER_PORT = "48610"; + + /** Http server host. */ + private int httpSrvPort = Integer.valueOf(DEFAULT_HTTP_SERVER_PORT); + + /** */ + public static final String IGNITE_TOTAL_CPU = "IGNITE_TOTAL_CPU"; + + /** CPU limit. */ + private double cpu = UNLIMITED; + + /** */ + public static final String IGNITE_RUN_CPU_PER_NODE = "IGNITE_RUN_CPU_PER_NODE"; + + /** CPU limit. */ + private double cpuPerNode = UNLIMITED; + + /** */ + public static final String IGNITE_TOTAL_MEMORY = "IGNITE_TOTAL_MEMORY"; + + /** Memory limit. */ + private double mem = UNLIMITED; + + /** */ + public static final String IGNITE_MEMORY_PER_NODE = "IGNITE_MEMORY_PER_NODE"; + + /** Memory limit. */ + private double memPerNode = UNLIMITED; + + /** */ + public static final String IGNITE_TOTAL_DISK_SPACE = "IGNITE_TOTAL_DISK_SPACE"; + + /** Disk space limit. */ + private double disk = UNLIMITED; + + /** */ + public static final String IGNITE_DISK_SPACE_PER_NODE = "IGNITE_DISK_SPACE_PER_NODE"; + + /** Disk space limit. */ + private double diskPerNode = UNLIMITED; + + /** */ + public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT"; + + /** Node count limit. */ + private double nodeCnt = UNLIMITED; + + /** */ + public static final String IGNITE_MIN_CPU_PER_NODE = "IGNITE_MIN_CPU_PER_NODE"; + + /** */ + public static final double DEFAULT_RESOURCE_MIN_CPU = 1; + + /** Min memory per node. */ + private double minCpu = DEFAULT_RESOURCE_MIN_CPU; + + /** */ + public static final String IGNITE_MIN_MEMORY_PER_NODE = "IGNITE_MIN_MEMORY_PER_NODE"; + + /** */ + public static final double DEFAULT_RESOURCE_MIN_MEM = 256; + + /** Min memory per node. */ + private double minMemory = DEFAULT_RESOURCE_MIN_MEM; + + /** */ + public static final String IGNITE_VERSION = "IGNITE_VERSION"; + + /** */ + public static final String DEFAULT_IGNITE_VERSION = "latest"; + + /** Ignite version. */ + private String igniteVer = DEFAULT_IGNITE_VERSION; + + /** */ + public static final String IGNITE_PACKAGE_URL = "IGNITE_PACKAGE_URL"; + + /** Ignite package url. */ + private String ignitePkgUrl; + + /** */ + public static final String IGNITE_PACKAGE_PATH = "IGNITE_PACKAGE_PATH"; + + /** Ignite package path. */ + private String ignitePkgPath; + + /** */ + public static final String IGNITE_WORK_DIR = "IGNITE_WORK_DIR"; + + /** */ + public static final String DEFAULT_IGNITE_WORK_DIR = "ignite-releases/"; + + /** Ignite version. */ + private String igniteWorkDir = DEFAULT_IGNITE_WORK_DIR; + + /** */ + public static final String IGNITE_USERS_LIBS = "IGNITE_USERS_LIBS"; + + /** Path to users libs. */ + private String userLibs = null; + + /** */ + public static final String IGNITE_USERS_LIBS_URL = "IGNITE_USERS_LIBS_URL"; + + /** URL to users libs. */ + private String userLibsUrl = null; + + /** */ + public static final String LICENCE_URL = "LICENCE_URL"; + + /** Licence url. */ + private String licenceUrl = null; + + /** */ + public static final String IGNITE_CONFIG_XML = "IGNITE_XML_CONFIG"; + + /** Ignite config. */ + private String igniteCfg = null; + + /** */ + public static final String IGNITE_CONFIG_XML_URL = "IGNITE_CONFIG_XML_URL"; + + /** */ + public static final String IGNITE_HTTP_SERVER_IDLE_TIMEOUT = "IGNITE_HTTP_SERVER_IDLE_TIMEOUT"; + + /** */ + public static final long IGNITE_HTTP_SERVER_IDLE_TIMEOUT_DEFAULT = 30000L; + + /** Jetty idle timeout. */ + private long idleTimeout = IGNITE_HTTP_SERVER_IDLE_TIMEOUT_DEFAULT; + + /** Url to ignite config. */ + private String igniteCfgUrl = null; + + /** */ + public static final String IGNITE_HOSTNAME_CONSTRAINT = "IGNITE_HOSTNAME_CONSTRAINT"; + + /** Url to ignite config. */ + private Pattern hostnameConstraint = null; + + /** */ + public ClusterProperties() { + // No-op. + } + + /** + * @return Cluster name. + */ + public String clusterName() { + return clusterName; + } + + /** + * @return CPU count limit. + */ + public double cpus() { + return cpu; + } + + /** + * Sets CPU count limit. + * + * @param cpu CPU count limit. + */ + public void cpus(double cpu) { + this.cpu = cpu; + } + + /** + * @return CPU count limit. + */ + public double cpusPerNode() { + return cpuPerNode; + } + + /** + * Sets CPU count limit. + * + * @param cpu CPU per node count limit. + */ + public void cpusPerNode(double cpu) { + this.cpuPerNode = cpu; + } + + /** + * @return mem limit. + */ + public double memory() { + return mem; + } + + /** + * Sets mem limit. + * + * @param mem Memory. + */ + public void memory(double mem) { + this.mem = mem; + } + + /** + * @return mem limit. + */ + public double memoryPerNode() { + return memPerNode; + } + + /** + * Sets mem limit. + * + * @param mem Memory. + */ + public void memoryPerNode(double mem) { + this.memPerNode = mem; + } + + /** + * @return JVM opts for ignite. + */ + public String jmvOpts() { + return this.jvmOpts; + } + + /** + * @return disk limit. + */ + public double disk() { + return disk; + } + + /** + * @return disk limit per node. + */ + public double diskPerNode() { + return diskPerNode; + } + + /** + * @return instance count limit. + */ + public double instances() { + return nodeCnt; + } + + /** + * @return min memory per node. + */ + public double minMemoryPerNode() { + return minMemory; + } + + /** + * Sets min memory. + * + * @param minMemory Min memory. + */ + public void minMemoryPerNode(double minMemory) { + this.minMemory = minMemory; + } + + /** + * Sets hostname constraint. + * + * @param ptrn Hostname pattern. + */ + public void hostnameConstraint(Pattern ptrn) { + this.hostnameConstraint = ptrn; + } + + /** + * @return min cpu count per node. + */ + public double minCpuPerNode() { + return minCpu; + } + + /** + * Sets min cpu count per node. + * + * @param minCpu min cpu count per node. + */ + public void minCpuPerNode(double minCpu) { + this.minCpu = minCpu; + } + + /** + * @return Ignite version. + */ + public String igniteVer() { + return igniteVer; + } + + /** + * @return Working directory. + */ + public String igniteWorkDir() { + return igniteWorkDir; + } + + /** + * @return User's libs. + */ + public String userLibs() { + return userLibs; + } + + /** + * @return Ignite configuration. + */ + public String igniteCfg() { + return igniteCfg; + } + + /** + * @return Master url. + */ + public String masterUrl() { + return mesosUrl; + } + + /** + * @return Http server host. + */ + public String httpServerHost() { + return httpSrvHost; + } + + /** + * @return Http server port. + */ + public int httpServerPort() { + return httpSrvPort; + } + + /** + * Sets the maximum Idle time for a http connection, which will be used for + * jetty server. The server provides resources for ignite mesos framework such as + * ignite archive, user's libs, configurations and etc. + * + * @return Http server idle timeout. + */ + public long idleTimeout() { + return idleTimeout; + } + + /** + * URL to ignite package. The URL should to point at valid apache ignite archive. + * This property can be useful if using own apache ignite build. + * + * @return Url to ignite package. + */ + public String ignitePackageUrl() { + return ignitePkgUrl; + } + + /** + * Path on local file system to ignite archive. That can be useful when + * Mesos working in Intranet or behind NAT. + * + * @return Path on local host to ignite package. + */ + public String ignitePackagePath() { + return ignitePkgPath; + } + + /** + * @return Url to ignite configuration. + */ + public String igniteConfigUrl() { + return igniteCfgUrl; + } + + /** + * @return Url to users libs configuration. + */ + public String usersLibsUrl() { + return userLibsUrl; + } + + /** + * @return Url to licence. + */ + public String licenceUrl() { + return licenceUrl; + } + + /** + * @return Host name constraint. + */ + public Pattern hostnameConstraint() { + return hostnameConstraint; + } + + /** + * @param cfg path to config file. + * @return Cluster configuration. + */ + public static ClusterProperties from(String cfg) { + try { + Properties props = null; + + if (cfg != null) { + props = new Properties(); + + try (FileInputStream in = new FileInputStream(cfg)) { + props.load(in); + } + } + + ClusterProperties prop = new ClusterProperties(); + + prop.mesosUrl = getStringProperty(MESOS_MASTER_URL, props, DEFAULT_MESOS_MASTER_URL); + + prop.httpSrvHost = getStringProperty(IGNITE_HTTP_SERVER_HOST, props, getNonLoopbackAddress()); + + String port = System.getProperty("PORT0"); + + if (port != null && !port.isEmpty()) + prop.httpSrvPort = Integer.valueOf(port); + else + prop.httpSrvPort = Integer.valueOf(getStringProperty(IGNITE_HTTP_SERVER_PORT, props, + DEFAULT_HTTP_SERVER_PORT)); + + prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME); + + prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, props, null); + prop.ignitePkgUrl = getStringProperty(IGNITE_PACKAGE_URL, props, null); + prop.ignitePkgPath = getStringProperty(IGNITE_PACKAGE_PATH, props, null); + prop.licenceUrl = getStringProperty(LICENCE_URL, props, null); + prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, props, null); + + prop.cpu = getDoubleProperty(IGNITE_TOTAL_CPU, props, UNLIMITED); + prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, UNLIMITED); + prop.mem = getDoubleProperty(IGNITE_TOTAL_MEMORY, props, UNLIMITED); + prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, UNLIMITED); + prop.disk = getDoubleProperty(IGNITE_TOTAL_DISK_SPACE, props, UNLIMITED); + prop.diskPerNode = getDoubleProperty(IGNITE_DISK_SPACE_PER_NODE, props, 1024.0); + prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, UNLIMITED); + prop.minCpu = getDoubleProperty(IGNITE_MIN_CPU_PER_NODE, props, DEFAULT_RESOURCE_MIN_CPU); + prop.minMemory = getDoubleProperty(IGNITE_MIN_MEMORY_PER_NODE, props, DEFAULT_RESOURCE_MIN_MEM); + + prop.jvmOpts = getStringProperty(IGNITE_JVM_OPTS, props, ""); + + prop.igniteVer = getStringProperty(IGNITE_VERSION, props, DEFAULT_IGNITE_VERSION); + prop.igniteWorkDir = getStringProperty(IGNITE_WORK_DIR, props, DEFAULT_IGNITE_WORK_DIR); + prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null); + prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null); + + String ptrn = getStringProperty(IGNITE_HOSTNAME_CONSTRAINT, props, null); + + prop.idleTimeout = getLongProperty(IGNITE_HTTP_SERVER_IDLE_TIMEOUT, props, IGNITE_HTTP_SERVER_IDLE_TIMEOUT_DEFAULT); + + if (ptrn != null) { + try { + prop.hostnameConstraint = Pattern.compile(ptrn); + } + catch (PatternSyntaxException e) { + log.log(Level.WARNING, "IGNITE_HOSTNAME_CONSTRAINT has invalid pattern. It will be ignore.", e); + } + } + + return prop; + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * @param name Property name. + * @param fileProps Property file. + * @return Property value. + */ + private static double getDoubleProperty(String name, Properties fileProps, Double dfltVal) { + if (fileProps != null && fileProps.containsKey(name)) + return Double.valueOf(fileProps.getProperty(name)); + + String prop = System.getProperty(name); + + if (prop == null) + prop = System.getenv(name); + + return prop == null ? dfltVal : Double.valueOf(prop); + } + + /** + * @param name Property name. + * @param fileProps Property file. + * @return Property value. + */ + private static long getLongProperty(String name, Properties fileProps, Long dfltVal) { + if (fileProps != null && fileProps.containsKey(name)) + return Long.valueOf(fileProps.getProperty(name)); + + String prop = System.getProperty(name); + + if (prop == null) + prop = System.getenv(name); + + return prop == null ? dfltVal : Long.valueOf(prop); + } + + /** + * @param name Property name. + * @param fileProps Property file. + * @return Property value. + */ + private static String getStringProperty(String name, Properties fileProps, String dfltVal) { + if (fileProps != null && fileProps.containsKey(name)) + return fileProps.getProperty(name); + + String prop = System.getProperty(name); + + if (prop == null) + prop = System.getenv(name); + + return prop == null ? dfltVal : prop; + } + + /** + * Finds a local, non-loopback, IPv4 address + * + * @return The first non-loopback IPv4 address found, or <code>null</code> if no such addresses found + * @throws java.net.SocketException If there was a problem querying the network interfaces + */ + public static String getNonLoopbackAddress() throws SocketException { + Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces(); + + while (ifaces.hasMoreElements()) { + NetworkInterface iface = ifaces.nextElement(); + + Enumeration<InetAddress> addrs = iface.getInetAddresses(); + + while (addrs.hasMoreElements()) { + InetAddress addr = addrs.nextElement(); + + if (addr instanceof Inet4Address && !addr.isLoopbackAddress()) + return addr.getHostAddress(); + } + } + + throw new RuntimeException("Failed. Could not find non-loopback address"); + } +} diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteFramework.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteFramework.java new file mode 100644 index 0000000..b9cba99 --- /dev/null +++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteFramework.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos; + +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.ignite.mesos.resource.IgniteProvider; +import org.apache.ignite.mesos.resource.JettyServer; +import org.apache.ignite.mesos.resource.ResourceHandler; +import org.apache.ignite.mesos.resource.ResourceProvider; +import org.apache.mesos.MesosSchedulerDriver; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; + +/** + * Ignite mesos framework. + */ +public class IgniteFramework { + /** */ + private static final Logger log = Logger.getLogger(IgniteFramework.class.getSimpleName()); + + /** Framework name. */ + private static final String IGNITE_FRAMEWORK_NAME = "Ignite"; + + /** MESOS system environment name */ + private static final String MESOS_USER_NAME = "MESOS_USER"; + + /** MESOS system environment role */ + private static final String MESOS_ROLE = "MESOS_ROLE"; + + /** */ + private static final String MESOS_AUTHENTICATE = "MESOS_AUTHENTICATE"; + + /** */ + private static final String DEFAULT_PRINCIPAL = "DEFAULT_PRINCIPAL"; + + /** */ + private static final String DEFAULT_SECRET = "DEFAULT_SECRET"; + + /** */ + private static final String MESOS_CHECKPOINT = "MESOS_CHECKPOINT"; + + /** + * Main methods has only one optional parameter - path to properties files. + * + * @param args Args. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + IgniteFramework igniteFramework = new IgniteFramework(); + + ClusterProperties clusterProps = ClusterProperties.from(args.length >= 1 ? args[0] : null); + + String baseUrl = String.format("http://%s:%d", clusterProps.httpServerHost(), clusterProps.httpServerPort()); + + JettyServer httpSrv = new JettyServer(); + + httpSrv.start( + new ResourceHandler(clusterProps.userLibs(), clusterProps.igniteCfg(), clusterProps.igniteWorkDir()), + clusterProps + ); + + ResourceProvider provider = new ResourceProvider(); + + IgniteProvider igniteProvider = new IgniteProvider(clusterProps.igniteWorkDir()); + + provider.init(clusterProps, igniteProvider, baseUrl); + + // Create the scheduler. + Scheduler scheduler = new IgniteScheduler(clusterProps, provider); + + // Create the driver. + MesosSchedulerDriver driver; + + if (System.getenv(MESOS_AUTHENTICATE) != null) { + log.info("Enabling authentication for the framework"); + + if (System.getenv(DEFAULT_PRINCIPAL) == null) { + log.log(Level.SEVERE, "Expecting authentication principal in the environment"); + + System.exit(1); + } + + if (System.getenv(DEFAULT_SECRET) == null) { + log.log(Level.SEVERE, "Expecting authentication secret in the environment"); + + System.exit(1); + } + + Protos.Credential cred = Protos.Credential.newBuilder() + .setPrincipal(System.getenv(DEFAULT_PRINCIPAL)) + .setSecret(System.getenv(DEFAULT_SECRET)) + .build(); + + driver = new MesosSchedulerDriver(scheduler, igniteFramework.getFrameworkInfo(), clusterProps.masterUrl(), + cred); + } + else + driver = new MesosSchedulerDriver(scheduler, igniteFramework.getFrameworkInfo(), clusterProps.masterUrl()); + + int status = driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1; + + httpSrv.stop(); + + // Ensure that the driver process terminates. + driver.stop(); + + System.exit(status); + } + + /** + * @return Mesos Protos FrameworkInfo. + */ + public Protos.FrameworkInfo getFrameworkInfo() throws Exception { + final int frameworkFailoverTimeout = 0; + + Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder() + .setName(IGNITE_FRAMEWORK_NAME) + .setUser(getUser()) + .setRole(getRole()) + .setFailoverTimeout(frameworkFailoverTimeout); + + if (System.getenv(MESOS_CHECKPOINT) != null) { + log.info("Enabling checkpoint for the framework"); + + frameworkBuilder.setCheckpoint(true); + } + + if (System.getenv(MESOS_AUTHENTICATE) != null) + frameworkBuilder.setPrincipal(System.getenv(DEFAULT_PRINCIPAL)); + else + frameworkBuilder.setPrincipal("ignite-framework-java"); + + return frameworkBuilder.build(); + } + + /** + * @return Mesos user name value. + */ + protected String getUser() { + String userName = System.getenv(MESOS_USER_NAME); + + return userName != null ? userName : ""; + } + + /** + * @return Mesos role value. + */ + protected String getRole() { + String mesosRole = System.getenv(MESOS_ROLE); + + return isRoleValid(mesosRole) ? mesosRole : "*"; + } + + /** + * @return Result of Mesos role validation. + */ + static boolean isRoleValid(String mRole) { + if (mRole == null || mRole.isEmpty() || mRole.equals(".") || mRole.equals("..") || + mRole.startsWith("-") || mRole.contains("/") || mRole.contains("\\") || mRole.contains(" ")) { + log.severe("Provided mesos role is not valid: [" + mRole + + "]. Mesos role should be a valid directory name."); + + return false; + } + return true; + } +} diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java new file mode 100644 index 0000000..3a58f65 --- /dev/null +++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.ignite.mesos.resource.ResourceProvider; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; + +/** + * Ignite scheduler receives offers from Mesos and decides how many resources will be occupied. + */ +public class IgniteScheduler implements Scheduler { + /** Cpus. */ + public static final String CPU = "cpus"; + + /** Mem. */ + public static final String MEM = "mem"; + + /** Disk. */ + public static final String DISK = "disk"; + + /** Default port range. */ + public static final String DEFAULT_PORT = ":47500..47510"; + + /** Delimiter char. */ + public static final String DELIM = ","; + + /** Logger. */ + private static final Logger log = Logger.getLogger(IgniteScheduler.class.getSimpleName()); + + /** ID generator. */ + private AtomicInteger taskIdGenerator = new AtomicInteger(); + + /** Task on host. */ + private Map<String, IgniteTask> tasks = new HashMap<>(); + + /** Cluster resources. */ + private ClusterProperties clusterProps; + + /** Resource provider. */ + private ResourceProvider resourceProvider; + + /** + * @param clusterProps Cluster limit. + * @param resourceProvider Resource provider. + */ + public IgniteScheduler(ClusterProperties clusterProps, ResourceProvider resourceProvider) { + this.clusterProps = clusterProps; + this.resourceProvider = resourceProvider; + } + + /** {@inheritDoc} */ + @Override public synchronized void resourceOffers(SchedulerDriver schedulerDriver, List<Protos.Offer> offers) { + log.log(Level.FINE, "Offers resources: {0}", offers.size()); + + for (Protos.Offer offer : offers) { + IgniteTask igniteTask = checkOffer(offer); + + // Decline offer which doesn't match by mem or cpu. + if (igniteTask == null) { + schedulerDriver.declineOffer(offer.getId()); + + continue; + } + + // Generate a unique task ID. + Protos.TaskID taskId = Protos.TaskID.newBuilder() + .setValue(Integer.toString(taskIdGenerator.incrementAndGet())).build(); + + log.log(Level.INFO, "Launching task: {0}", igniteTask); + + // Create task to run. + Protos.TaskInfo task = createTask(offer, igniteTask, taskId); + + try { + schedulerDriver.launchTasks(Collections.singletonList(offer.getId()), + Collections.singletonList(task), + Protos.Filters.newBuilder().setRefuseSeconds(1).build()); + } + catch (RuntimeException e) { + log.log(Level.SEVERE, "Failed launch task. Task id: {0}. Task info: {1}", + new Object[]{taskId, task, e}); + + throw e; + } + + tasks.put(taskId.getValue(), igniteTask); + } + } + + /** + * Create Task. + * + * @param offer Offer. + * @param igniteTask Task description. + * @param taskId Task id. + * @return Task. + */ + private Protos.TaskInfo createTask(Protos.Offer offer, IgniteTask igniteTask, Protos.TaskID taskId) { + String cfgUrl = clusterProps.igniteConfigUrl() != null ? + clusterProps.igniteConfigUrl() : resourceProvider.igniteConfigUrl(); + + Protos.CommandInfo.Builder builder = Protos.CommandInfo.newBuilder() + .setEnvironment(Protos.Environment.newBuilder() + .addVariables(Protos.Environment.Variable.newBuilder() + .setName("IGNITE_TCP_DISCOVERY_ADDRESSES") + .setValue(getAddress(offer.getHostname()))) + .addVariables(Protos.Environment.Variable.newBuilder() + .setName("JVM_OPTS") + .setValue(clusterProps.jmvOpts()))) + .addUris(Protos.CommandInfo.URI.newBuilder() + .setValue(clusterProps.ignitePackageUrl() != null ? + clusterProps.ignitePackageUrl() : resourceProvider.igniteUrl()) + .setExtract(true)) + .addUris(Protos.CommandInfo.URI.newBuilder() + .setValue(cfgUrl)); + + // Collection user's libs. + Collection<String> usersLibs = new ArrayList<>(); + + if (clusterProps.usersLibsUrl() != null && !clusterProps.usersLibsUrl().isEmpty()) + Collections.addAll(usersLibs, clusterProps.usersLibsUrl().split(DELIM)); + + if (resourceProvider.resourceUrl() != null && !resourceProvider.resourceUrl().isEmpty()) + usersLibs.addAll(resourceProvider.resourceUrl()); + + for (String url : usersLibs) + builder.addUris(Protos.CommandInfo.URI.newBuilder().setValue(url)); + + String cfgName = resourceProvider.configName(); + + if (clusterProps.igniteConfigUrl() != null) + cfgName = fileName(clusterProps.igniteConfigUrl()); + + String licenceFile = null; + + if (clusterProps.licenceUrl() != null) + licenceFile = fileName(clusterProps.licenceUrl()); + + builder.setValue( + (licenceFile != null ? "find . -maxdepth 1 -name \"" + licenceFile + "\" -exec cp {} ./*/ \\; && " : "") + + "find . -maxdepth 1 -name \"*.jar\" -exec cp {} ./*/libs/ \\; && " + + "./*/bin/ignite.sh " + + cfgName + + " -J-Xmx" + String.valueOf((int)igniteTask.mem() + "m") + + " -J-Xms" + String.valueOf((int)igniteTask.mem()) + "m"); + + return Protos.TaskInfo.newBuilder() + .setName("Ignite node " + taskId.getValue()) + .setTaskId(taskId) + .setSlaveId(offer.getSlaveId()) + .setCommand(builder) + .addResources(Protos.Resource.newBuilder() + .setName(CPU) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.cpuCores()))) + .addResources(Protos.Resource.newBuilder() + .setName(MEM) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.mem()))) + .addResources(Protos.Resource.newBuilder() + .setName(DISK) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.disk()))) + .build(); + } + + /** + * @param path Path. + * @return File name. + */ + private String fileName(String path) { + String[] split = path.split("/"); + + return split[split.length - 1]; + } + + /** + * @return Address running nodes. + */ + private String getAddress(String address) { + if (tasks.isEmpty()) { + if (address != null && !address.isEmpty()) + return address + DEFAULT_PORT; + + return ""; + } + + StringBuilder sb = new StringBuilder(); + + for (IgniteTask task : tasks.values()) + sb.append(task.host()).append(DEFAULT_PORT).append(DELIM); + + return sb.substring(0, sb.length() - 1); + } + + /** + * Check slave resources and return resources infos. + * + * @param offer Offer request. + * @return Ignite task description. + */ + private IgniteTask checkOffer(Protos.Offer offer) { + // Check limit on running nodes. + if (clusterProps.instances() <= tasks.size()) + return null; + + double cpus = -1; + double mem = -1; + double disk = -1; + + // Check host name + if (clusterProps.hostnameConstraint() != null + && clusterProps.hostnameConstraint().matcher(offer.getHostname()).matches()) + return null; + + // Collect resource on slave. + for (Protos.Resource resource : offer.getResourcesList()) { + if (resource.getName().equals(CPU)) { + if (resource.getType().equals(Protos.Value.Type.SCALAR)) + cpus = resource.getScalar().getValue(); + else + log.log(Level.FINE, "Cpus resource was not a scalar: {0}" + resource.getType()); + } + else if (resource.getName().equals(MEM)) { + if (resource.getType().equals(Protos.Value.Type.SCALAR)) + mem = resource.getScalar().getValue(); + else + log.log(Level.FINE, "Mem resource was not a scalar: {0}", resource.getType()); + } + else if (resource.getName().equals(DISK)) + if (resource.getType().equals(Protos.Value.Type.SCALAR)) + disk = resource.getScalar().getValue(); + else + log.log(Level.FINE, "Disk resource was not a scalar: {0}", resource.getType()); + } + + // Check that slave satisfies min requirements. + if (cpus < clusterProps.minCpuPerNode() || mem < clusterProps.minMemoryPerNode()) { + log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList()); + + return null; + } + + double totalCpus = 0; + double totalMem = 0; + double totalDisk = 0; + + // Collect occupied resources. + for (IgniteTask task : tasks.values()) { + totalCpus += task.cpuCores(); + totalMem += task.mem(); + totalDisk += task.disk(); + } + + cpus = Math.min(clusterProps.cpus() - totalCpus, Math.min(cpus, clusterProps.cpusPerNode())); + mem = Math.min(clusterProps.memory() - totalMem, Math.min(mem, clusterProps.memoryPerNode())); + disk = Math.min(clusterProps.disk() - totalDisk, Math.min(disk, clusterProps.diskPerNode())); + + if ((clusterProps.cpusPerNode() != ClusterProperties.UNLIMITED && clusterProps.cpusPerNode() != cpus) + || (clusterProps.memoryPerNode() != ClusterProperties.UNLIMITED && clusterProps.memoryPerNode() != mem)) { + log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList()); + + return null; + } + + if (cpus > 0 && mem > 0) + return new IgniteTask(offer.getHostname(), cpus, mem, disk); + else { + log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList()); + + return null; + } + } + + /** {@inheritDoc} */ + @Override public synchronized void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) { + final String taskId = taskStatus.getTaskId().getValue(); + + log.log(Level.INFO, "Received update event task: {0} is in state: {1}", + new Object[]{taskId, taskStatus.getState()}); + + if (taskStatus.getState().equals(Protos.TaskState.TASK_FAILED) + || taskStatus.getState().equals(Protos.TaskState.TASK_ERROR) + || taskStatus.getState().equals(Protos.TaskState.TASK_FINISHED) + || taskStatus.getState().equals(Protos.TaskState.TASK_KILLED) + || taskStatus.getState().equals(Protos.TaskState.TASK_LOST)) { + IgniteTask failedTask = tasks.remove(taskId); + + if (failedTask != null) { + List<Protos.Request> requests = new ArrayList<>(); + + Protos.Request request = Protos.Request.newBuilder() + .addResources(Protos.Resource.newBuilder() + .setType(Protos.Value.Type.SCALAR) + .setName(MEM) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.mem()))) + .addResources(Protos.Resource.newBuilder() + .setType(Protos.Value.Type.SCALAR) + .setName(CPU) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.cpuCores()))) + .build(); + + requests.add(request); + + schedulerDriver.requestResources(requests); + } + } + } + + /** + * @param clusterProps Cluster properties. + */ + public void setClusterProps(ClusterProperties clusterProps) { + this.clusterProps = clusterProps; + } + + /** {@inheritDoc} */ + @Override public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID, + Protos.MasterInfo masterInfo) { + log.log(Level.INFO, "Scheduler registered. Master: {0}:{1}, framework={2}", new Object[]{masterInfo.getIp(), + masterInfo.getPort(), frameworkID}); + } + + /** {@inheritDoc} */ + @Override public void disconnected(SchedulerDriver schedulerDriver) { + log.info("Scheduler disconnected."); + } + + /** {@inheritDoc} */ + @Override public void error(SchedulerDriver schedulerDriver, String s) { + log.log(Level.SEVERE, "Failed. Error message: {0}", s); + } + + /** {@inheritDoc} */ + @Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, + Protos.SlaveID slaveID, byte[] bytes) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void slaveLost(SchedulerDriver schedulerDriver, Protos.SlaveID slaveID) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void executorLost(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, + Protos.SlaveID slaveID, int i) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void offerRescinded(SchedulerDriver schedulerDriver, Protos.OfferID offerID) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) { + // No-op. + } +} diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteTask.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteTask.java new file mode 100644 index 0000000..391a381 --- /dev/null +++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/IgniteTask.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos; + +/** + * Information about launched task. + */ +public class IgniteTask { + /** */ + public final String host; + + /** */ + public final double cpuCores; + + /** */ + public final double mem; + + /** */ + public final double disk; + + /** + * Ignite launched task. + * + * @param host Host. + * @param cpuCores Cpu cores count. + * @param mem Memory. + * @param disk Disk. + */ + public IgniteTask(String host, double cpuCores, double mem, double disk) { + this.host = host; + this.cpuCores = cpuCores; + this.mem = mem; + this.disk = disk; + } + + /** + * @return Host. + */ + public String host() { + return host; + } + + /** + * @return Cores count. + */ + public double cpuCores() { + return cpuCores; + } + + /** + * @return Memory. + */ + public double mem() { + return mem; + } + + /** + * @return Disk. + */ + public double disk() { + return disk; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "IgniteTask " + + "host: [" + host + ']' + + ", cpuCores: [" + cpuCores + "]" + + ", mem: [" + mem + "]"; + } +} diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/package-info.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/package-info.java new file mode 100644 index 0000000..6e6a0f6 --- /dev/null +++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains classes to support integration with Apache Mesos. + */ + +package org.apache.ignite.mesos; diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java new file mode 100644 index 0000000..bd6d471 --- /dev/null +++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos.resource; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.channels.Channels; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.ignite.mesos.ClusterProperties; + +import static org.apache.ignite.mesos.ClusterProperties.IGNITE_VERSION; + +/** + * Class downloads and stores Ignite. + */ +public class IgniteProvider { + /** Logger. */ + private static final Logger log = Logger.getLogger(IgniteProvider.class.getSimpleName()); + + /** */ + private static final String DOWNLOAD_URL_PATTERN = "https://archive.apache.org/dist/ignite/%s/apache-ignite-%s-bin.zip"; + + /** URL for request Ignite latest version. */ + private static final String IGNITE_LATEST_VERSION_URL = "https://ignite.apache.org/latest"; + + /** Mirrors. */ + private static final String APACHE_MIRROR_URL = "https://www.apache.org/dyn/closer.cgi?as_json=1"; + + /** Ignite on Apache URL path. */ + private static final String IGNITE_PATH = "/ignite/%s/apache-ignite-%s-bin.zip"; + + /** Version pattern. */ + private static final Pattern VERSION_PATTERN = Pattern.compile("(?<=version=).*\\S+"); + + /** */ + private String downloadFolder; + + /** + * @param downloadFolder Folder with ignite. + */ + public IgniteProvider(String downloadFolder) { + this.downloadFolder = downloadFolder; + } + + /** + * @param ver Ignite version. + * @return Path to latest ignite. + * @throws IOException If downloading failed. + */ + public String getIgnite(String ver) throws IOException { + return downloadIgnite(ver); + } + + /** + * @param ver Ignite version which will be downloaded. If {@code null} will download the latest ignite version. + * @return Ignite archive. + * @throws IOException If downloading failed. + */ + public String downloadIgnite(String ver) throws IOException { + assert ver != null; + + URL url; + + // get the latest version. + if (ver.equals(ClusterProperties.DEFAULT_IGNITE_VERSION)) { + try { + ver = findLatestVersion(); + + // and try to retrieve from a mirror. + url = new URL(String.format(findMirror() + IGNITE_PATH, ver, ver)); + } + catch (Exception e) { + // fallback to archive. + url = new URL(String.format(DOWNLOAD_URL_PATTERN, ver, ver)); + } + } + else { + // or from archive. + url = new URL(String.format(DOWNLOAD_URL_PATTERN, ver, ver)); + } + + return downloadIgnite(url); + } + + /** + * Attempts to retrieve the preferred mirror. + * + * @return Mirror url. + * @throws IOException If failed. + */ + private String findMirror() throws IOException { + String response = getHttpContents(new URL(APACHE_MIRROR_URL)); + + if (response == null) + throw new RuntimeException("Failed to retrieve mirrors"); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode mirrorUrl = mapper.readTree(response).get("preferred"); + + if (mirrorUrl == null) + throw new RuntimeException("Failed to find the preferred mirror"); + + return mirrorUrl.asText(); + } + + /** + * Attempts to obtain the latest version. + * + * @return Latest version. + * @throws IOException If failed. + */ + private String findLatestVersion() throws IOException { + String response = getHttpContents(new URL(IGNITE_LATEST_VERSION_URL)); + + if (response == null) + throw new RuntimeException("Failed to identify the latest version. Specify it with " + IGNITE_VERSION); + + Matcher m = VERSION_PATTERN.matcher(response); + if (m.find()) + return m.group(); + else + throw new RuntimeException("Failed to retrieve the latest version. Specify it with " + IGNITE_VERSION); + } + + /** + * @param url Url. + * @return Contents. + * @throws IOException If failed. + */ + private String getHttpContents(URL url) throws IOException { + HttpURLConnection conn = (HttpURLConnection)url.openConnection(); + + int code = conn.getResponseCode(); + + if (code != 200) + throw null; + + BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream(), "UTF-8")); + return rd.lines().collect(Collectors.joining()); + } + + /** + * Downloads ignite by URL if this version wasn't downloaded before. + * + * @param url URL to Ignite. + * @return File name. + */ + private String downloadIgnite(URL url) { + assert url != null; + + try { + HttpURLConnection conn = (HttpURLConnection)url.openConnection(); + + int code = conn.getResponseCode(); + + if (code == 200) { + checkDownloadFolder(); + + String fileName = fileName(url.toString()); + + if (fileExist(fileName)) + return fileName; + + log.log(Level.INFO, "Downloading from {0}", url.toString()); + + FileOutputStream outFile = new FileOutputStream(Paths.get(downloadFolder, fileName).toFile()); + + outFile.getChannel().transferFrom(Channels.newChannel(conn.getInputStream()), 0, Long.MAX_VALUE); + + outFile.close(); + + return fileName; + } + else + throw new RuntimeException("Got unexpected response code. Response code: " + code + " from " + url); + } + catch (IOException e) { + throw new RuntimeException("Failed to download Ignite.", e); + } + } + + /** + * Checks that file exists. + * + * @param fileName File name. + * @return {@code True} if file exist otherwise {@code false}. + */ + private boolean fileExist(String fileName) { + String pathToIgnite = downloadFolder + (downloadFolder.endsWith("/") ? "" : '/') + fileName; + + return new File(pathToIgnite).exists(); + } + + /** + * Copy file to working directory. + * + * @param filePath File path. + * @return File name. + * @throws IOException If coping failed. + */ + String copyToWorkDir(String filePath) throws IOException { + Path srcFile = Paths.get(filePath); + + if (Files.exists(srcFile)) { + checkDownloadFolder(); + + Path newDir = Paths.get(downloadFolder); + + Path fileName = srcFile.getFileName(); + + Files.copy(srcFile, newDir.resolve(fileName), StandardCopyOption.REPLACE_EXISTING); + + return fileName.toString(); + } + + return null; + } + + /** + * @return Download folder. + */ + private File checkDownloadFolder() { + File file = new File(downloadFolder); + + if (!file.exists()) + file.mkdirs(); + + if (!file.exists()) + throw new IllegalArgumentException("Failed to create working directory: " + downloadFolder); + + return file; + } + + /** + * @param url URL. + * @return File name. + */ + private static String fileName(String url) { + String[] split = url.split("/"); + + return split[split.length - 1]; + } +} diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java new file mode 100644 index 0000000..33e6c1e --- /dev/null +++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos.resource; + +import org.apache.ignite.mesos.ClusterProperties; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; + +/** + * Embedded jetty server. + */ +public class JettyServer { + /** */ + private Server server; + + /** + * Starts jetty server. + * + * @param handler Handler. + * @param props Cluster properties. + * @throws Exception If failed. + */ + public void start(Handler handler, ClusterProperties props) throws Exception { + if (server == null) { + server = new Server(); + + ServerConnector connector = new ServerConnector(server); + + connector.setHost(props.httpServerHost()); + connector.setPort(props.httpServerPort()); + connector.setIdleTimeout(props.idleTimeout()); + + server.addConnector(connector); + server.setHandler(handler); + + server.start(); + } + else + throw new IllegalStateException("Jetty server has already been started."); + } + + /** + * Stops server. + * + * @throws Exception If failed. + */ + public void stop() throws Exception { + if (server != null) + server.stop(); + else + throw new IllegalStateException("Jetty server has not yet been started."); + } +} diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java new file mode 100644 index 0000000..51de26b --- /dev/null +++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos.resource; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.server.HttpOutput; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; + +/** + * HTTP controller which provides on slave resources. + */ +public class ResourceHandler extends AbstractHandler { + /** */ + public static final String IGNITE_PREFIX = "/ignite/"; + + /** */ + public static final String LIBS_PREFIX = "/libs/"; + + /** */ + public static final String CONFIG_PREFIX = "/config/"; + + /** */ + public static final String DEFAULT_CONFIG = CONFIG_PREFIX + "default/"; + + /** */ + private String libsDir; + + /** */ + private String cfgPath; + + /** */ + private String igniteDir; + + /** + * @param libsDir Directory with user's libs. + * @param cfgPath Path to config file. + * @param igniteDir Directory with ignites. + */ + public ResourceHandler(String libsDir, String cfgPath, String igniteDir) { + this.libsDir = libsDir; + this.cfgPath = cfgPath; + this.igniteDir = igniteDir; + } + + /** + * {@inheritDoc} + */ + @Override public void handle( + String url, + Request request, + HttpServletRequest httpServletRequest, + HttpServletResponse response) throws IOException, ServletException { + + String[] path = url.split("/"); + + String fileName = path[path.length - 1]; + + String servicePath = url.substring(0, url.length() - fileName.length()); + + switch (servicePath) { + case IGNITE_PREFIX: + handleRequest(response, "application/zip-archive", igniteDir + "/" + fileName); + + request.setHandled(true); + break; + + case LIBS_PREFIX: + handleRequest(response, "application/java-archive", libsDir + "/" + fileName); + + request.setHandled(true); + break; + + case CONFIG_PREFIX: + handleRequest(response, "application/xml", cfgPath); + + request.setHandled(true); + break; + + case DEFAULT_CONFIG: + handleRequest(response, "application/xml", + Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName), + fileName); + + request.setHandled(true); + break; + } + } + + /** + * @param response Http response. + * @param type Type. + * @param path Path to file. + * @throws IOException If failed. + */ + private static void handleRequest(HttpServletResponse response, String type, String path) throws IOException { + Path path0 = Paths.get(path); + + response.setContentType(type); + response.setHeader("Content-Disposition", "attachment; filename=\"" + path0.getFileName() + "\""); + + try (HttpOutput out = (HttpOutput)response.getOutputStream()) { + out.sendContent(FileChannel.open(path0, StandardOpenOption.READ)); + } + } + + /** + * @param response Http response. + * @param type Type. + * @param stream Stream. + * @param attachmentName Attachment name. + * @throws IOException If failed. + */ + private static void handleRequest(HttpServletResponse response, String type, InputStream stream, + String attachmentName) throws IOException { + response.setContentType(type); + response.setHeader("Content-Disposition", "attachment; filename=\"" + attachmentName + "\""); + + try (HttpOutput out = (HttpOutput)response.getOutputStream()) { + out.sendContent(stream); + } + } +} diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java new file mode 100644 index 0000000..47fdede --- /dev/null +++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos.resource; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.ignite.mesos.ClusterProperties; + +import static org.apache.ignite.mesos.resource.ResourceHandler.CONFIG_PREFIX; +import static org.apache.ignite.mesos.resource.ResourceHandler.DEFAULT_CONFIG; +import static org.apache.ignite.mesos.resource.ResourceHandler.IGNITE_PREFIX; +import static org.apache.ignite.mesos.resource.ResourceHandler.LIBS_PREFIX; + +/** + * Provides path to user's libs and config file. + */ +public class ResourceProvider { + /** */ + private static final Logger log = Logger.getLogger(ResourceProvider.class.getSimpleName()); + + /** Ignite url. */ + private String igniteUrl; + + /** Resources. */ + private Collection<String> libsUris; + + /** Url config. */ + private String cfgUrl; + + /** Config name. */ + private String cfgName; + + /** + * @param props Cluster properties. + * @param provider Ignite provider. + * @param baseUrl Base url. + */ + public void init(ClusterProperties props, IgniteProvider provider, String baseUrl) throws IOException { + if (props.ignitePackageUrl() == null && props.ignitePackagePath() == null) { + // Downloading ignite. + try { + igniteUrl = baseUrl + IGNITE_PREFIX + provider.getIgnite(props.igniteVer()); + } + catch (Exception e) { + log.log(Level.SEVERE, "Failed to download Ignite [err={0}, ver={1}].\n" + + "If application working behind NAT or Intranet and does not have access to external resources " + + "then you can use IGNITE_PACKAGE_URL or IGNITE_PACKAGE_PATH property that allow to use local " + + "resources.", + new Object[]{e, props.igniteVer()}); + } + } + + if (props.ignitePackagePath() != null) { + Path ignitePackPath = Paths.get(props.ignitePackagePath()); + + if (Files.exists(ignitePackPath) && !Files.isDirectory(ignitePackPath)) { + try { + String fileName = provider.copyToWorkDir(props.ignitePackagePath()); + + assert fileName != null; + + igniteUrl = baseUrl + IGNITE_PREFIX + fileName; + } + catch (Exception e) { + log.log(Level.SEVERE, "Failed to copy Ignite to working directory [err={0}, path={1}].", + new Object[] {e, props.ignitePackagePath()}); + + throw e; + } + } + else + throw new IllegalArgumentException("Failed to find a ignite archive by path: " + + props.ignitePackagePath()); + } + + // Find all jar files into user folder. + if (props.userLibs() != null && !props.userLibs().isEmpty()) { + File libsDir = new File(props.userLibs()); + + List<String> libs = new ArrayList<>(); + + if (libsDir.isDirectory()) { + File[] files = libsDir.listFiles(); + + if (files != null) { + for (File lib : files) { + if (lib.isFile() && lib.canRead() && + (lib.getName().endsWith(".jar") || lib.getName().endsWith(".JAR"))) + libs.add(baseUrl + LIBS_PREFIX + lib.getName()); + } + } + } + + libsUris = libs.isEmpty() ? null : libs; + } + + // Set configuration url. + if (props.igniteCfg() != null) { + File cfg = new File(props.igniteCfg()); + + if (cfg.isFile() && cfg.canRead()) { + cfgUrl = baseUrl + CONFIG_PREFIX + cfg.getName(); + + cfgName = cfg.getName(); + } + } + else { + cfgName = "ignite-default-config.xml"; + + cfgUrl = baseUrl + DEFAULT_CONFIG + cfgName; + } + } + + /** + * @return Config name. + */ + public String configName() { + return cfgName; + } + + /** + * @return Ignite url. + */ + public String igniteUrl() { + return igniteUrl; + } + + /** + * @return Urls to user's libs. + */ + public Collection<String> resourceUrl() { + return libsUris; + } + + /** + * @return Url to config file. + */ + public String igniteConfigUrl() { + return cfgUrl; + } +} diff --git a/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/package-info.java b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/package-info.java new file mode 100644 index 0000000..a90176d --- /dev/null +++ b/modules/mesos-ext/src/main/java/org/apache/ignite/mesos/resource/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains classes provide access to resources. + */ + +package org.apache.ignite.mesos.resource; diff --git a/modules/mesos-ext/src/main/resources/ignite-default-config.xml b/modules/mesos-ext/src/main/resources/ignite-default-config.xml new file mode 100644 index 0000000..2f26398 --- /dev/null +++ b/modules/mesos-ext/src/main/resources/ignite-default-config.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd"> + <bean class="org.apache.ignite.configuration.IgniteConfiguration"> + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"/> + </property> + + <property name="joinTimeout" value="60000"/> + </bean> + </property> + </bean> +</beans> diff --git a/modules/mesos-ext/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java b/modules/mesos-ext/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java new file mode 100644 index 0000000..bab4b73 --- /dev/null +++ b/modules/mesos-ext/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +import org.apache.ignite.mesos.IgniteSchedulerSelfTest; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * Apache Mesos integration tests. + */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + IgniteSchedulerSelfTest.class +}) +public class IgniteMesosTestSuite { +} diff --git a/modules/mesos-ext/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos-ext/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java new file mode 100644 index 0000000..b362bee --- /dev/null +++ b/modules/mesos-ext/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; +import org.apache.ignite.mesos.resource.ResourceProvider; +import org.apache.mesos.Protos; +import org.apache.mesos.SchedulerDriver; +import org.apache.mesos.scheduler.Protos.OfferConstraints; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +/** + * Scheduler tests. + */ +public class IgniteSchedulerSelfTest { + /** */ + private IgniteScheduler scheduler; + + /** */ + @Before + public void setUp() throws Exception { + ClusterProperties clustProp = new ClusterProperties(); + + scheduler = new IgniteScheduler(clustProp, new ResourceProvider() { + @Override public String configName() { + return "config.xml"; + } + + @Override public String igniteUrl() { + return "ignite.jar"; + } + + @Override public String igniteConfigUrl() { + return "config.xml"; + } + + @Override public Collection<String> resourceUrl() { + return null; + } + }); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testHostRegister() throws Exception { + Protos.Offer offer = createOffer("hostname", 4, 1024); + + DriverMock mock = new DriverMock(); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNotNull(mock.launchedTask); + assertEquals(1, mock.launchedTask.size()); + + Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next(); + + assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU), 0); + assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM), 0); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testDeclineByCpu() throws Exception { + Protos.Offer offer = createOffer("hostname", 4, 1024); + + DriverMock mock = new DriverMock(); + + ClusterProperties clustProp = new ClusterProperties(); + clustProp.cpus(2); + + scheduler.setClusterProps(clustProp); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNotNull(mock.launchedTask); + assertEquals(1, mock.launchedTask.size()); + + Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next(); + + assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU), 0); + assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM), 0); + + mock.clear(); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNull(mock.launchedTask); + + Protos.OfferID declinedOffer = mock.declinedOffer; + + assertEquals(offer.getId(), declinedOffer); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testDeclineByMem() throws Exception { + Protos.Offer offer = createOffer("hostname", 4, 1024); + + DriverMock mock = new DriverMock(); + + ClusterProperties clustProp = new ClusterProperties(); + clustProp.memory(512); + + scheduler.setClusterProps(clustProp); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNotNull(mock.launchedTask); + assertEquals(1, mock.launchedTask.size()); + + Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next(); + + assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU), 0); + assertEquals(512.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM), 0); + + mock.clear(); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNull(mock.launchedTask); + + Protos.OfferID declinedOffer = mock.declinedOffer; + + assertEquals(offer.getId(), declinedOffer); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testDeclineByMemCpu() throws Exception { + Protos.Offer offer = createOffer("hostname", 1, 1024); + + DriverMock mock = new DriverMock(); + + ClusterProperties clustProp = new ClusterProperties(); + clustProp.cpus(4); + clustProp.memory(2000); + + scheduler.setClusterProps(clustProp); + + double totalMem = 0, totalCpu = 0; + + for (int i = 0; i < 2; i++) { + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNotNull(mock.launchedTask); + assertEquals(1, mock.launchedTask.size()); + + Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next(); + + totalCpu += resources(taskInfo.getResourcesList(), IgniteScheduler.CPU); + totalMem += resources(taskInfo.getResourcesList(), IgniteScheduler.MEM); + + mock.clear(); + } + + assertEquals(2.0, totalCpu, 0); + assertEquals(2000.0, totalMem, 0); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNull(mock.launchedTask); + + Protos.OfferID declinedOffer = mock.declinedOffer; + + assertEquals(offer.getId(), declinedOffer); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testDeclineByCpuMinRequirements() throws Exception { + Protos.Offer offer = createOffer("hostname", 8, 10240); + + DriverMock mock = new DriverMock(); + + ClusterProperties clustProp = new ClusterProperties(); + clustProp.minCpuPerNode(12); + + scheduler.setClusterProps(clustProp); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNotNull(mock.declinedOffer); + + assertEquals(offer.getId(), mock.declinedOffer); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testDeclineByMemMinRequirements() throws Exception { + Protos.Offer offer = createOffer("hostname", 8, 10240); + + DriverMock mock = new DriverMock(); + + ClusterProperties clustProp = new ClusterProperties(); + clustProp.minMemoryPerNode(15000); + + scheduler.setClusterProps(clustProp); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNotNull(mock.declinedOffer); + + assertEquals(offer.getId(), mock.declinedOffer); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testHosthameConstraint() throws Exception { + Protos.Offer offer = createOffer("hostname", 8, 10240); + + DriverMock mock = new DriverMock(); + + ClusterProperties clustProp = new ClusterProperties(); + clustProp.hostnameConstraint(Pattern.compile("hostname")); + + scheduler.setClusterProps(clustProp); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNotNull(mock.declinedOffer); + + assertEquals(offer.getId(), mock.declinedOffer); + + offer = createOffer("hostnameAccept", 8, 10240); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNotNull(mock.launchedTask); + assertEquals(1, mock.launchedTask.size()); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testPerNode() throws Exception { + Protos.Offer offer = createOffer("hostname", 8, 1024); + + DriverMock mock = new DriverMock(); + + ClusterProperties clustProp = new ClusterProperties(); + clustProp.memoryPerNode(1024); + clustProp.cpusPerNode(2); + + scheduler.setClusterProps(clustProp); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNotNull(mock.launchedTask); + + Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next(); + + assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU), 0); + assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM), 0); + + mock.clear(); + + offer = createOffer("hostname", 1, 2048); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNull(mock.launchedTask); + + assertNotNull(mock.declinedOffer); + assertEquals(offer.getId(), mock.declinedOffer); + + mock.clear(); + + offer = createOffer("hostname", 4, 512); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNull(mock.launchedTask); + + assertNotNull(mock.declinedOffer); + assertEquals(offer.getId(), mock.declinedOffer); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testIgniteFramework() throws Exception { + final String mesosUserValue = "userAAAAA"; + final String mesosRoleValue = "role1"; + + IgniteFramework igniteFramework = new IgniteFramework() { + @Override protected String getUser() { + return mesosUserValue; + } + + @Override protected String getRole() { + return mesosRoleValue; + } + }; + + Protos.FrameworkInfo info = igniteFramework.getFrameworkInfo(); + + String actualUserValue = info.getUser(); + String actualRoleValue = info.getRole(); + + assertEquals(actualUserValue, mesosUserValue); + assertEquals(actualRoleValue, mesosRoleValue); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testMesosRoleValidation() throws Exception { + List<String> failedRoleValues = Arrays.asList("", ".", "..", "-testRole", + "test/Role", "test\\Role", "test Role", null); + + for (String failedRoleValue : failedRoleValues) + assertFalse(IgniteFramework.isRoleValid(failedRoleValue)); + } + + /** + * @param resourceType Resource type. + * @return Value. + */ + private Double resources(List<Protos.Resource> resources, String resourceType) { + for (Protos.Resource resource : resources) { + if (resource.getName().equals(resourceType)) + return resource.getScalar().getValue(); + } + + return null; + } + + /** + * @param hostname Hostname + * @param cpu Cpu count. + * @param mem Mem size. + * @return Offer. + */ + private Protos.Offer createOffer(String hostname, double cpu, double mem) { + return Protos.Offer.newBuilder() + .setId(Protos.OfferID.newBuilder().setValue("1")) + .setSlaveId(Protos.SlaveID.newBuilder().setValue("1")) + .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("1")) + .setHostname(hostname) + .addResources(Protos.Resource.newBuilder() + .setType(Protos.Value.Type.SCALAR) + .setName(IgniteScheduler.CPU) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu).build()) + .build()) + .addResources(Protos.Resource.newBuilder() + .setType(Protos.Value.Type.SCALAR) + .setName(IgniteScheduler.MEM) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(mem).build()) + .build()) + .build(); + } + + /** + * No-op implementation. + */ + public static class DriverMock implements SchedulerDriver { + /** */ + Collection<Protos.TaskInfo> launchedTask; + + /** */ + Protos.OfferID declinedOffer; + + /** + * Clears launched task. + */ + public void clear() { + launchedTask = null; + declinedOffer = null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status start() { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status stop(boolean failover) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status stop() { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status abort() { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status join() { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status run() { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status requestResources(Collection<Protos.Request> requests) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status launchTasks(Collection<Protos.OfferID> offerIds, + Collection<Protos.TaskInfo> tasks, Protos.Filters filters) { + launchedTask = tasks; + + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status launchTasks(Collection<Protos.OfferID> offerIds, + Collection<Protos.TaskInfo> tasks) { + launchedTask = tasks; + + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status launchTasks(Protos.OfferID offerId, Collection<Protos.TaskInfo> tasks, + Protos.Filters filters) { + launchedTask = tasks; + + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status launchTasks(Protos.OfferID offerId, Collection<Protos.TaskInfo> tasks) { + launchedTask = tasks; + + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status killTask(Protos.TaskID taskId) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status acceptOffers(Collection<Protos.OfferID> collection, + Collection<Protos.Offer.Operation> collection1, Protos.Filters filters) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status declineOffer(Protos.OfferID offerId, Protos.Filters filters) { + declinedOffer = offerId; + + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status declineOffer(Protos.OfferID offerId) { + declinedOffer = offerId; + + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status reviveOffers() { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status reviveOffers(Collection<String> collection) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status suppressOffers() { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status suppressOffers(Collection<String> collection) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status acknowledgeStatusUpdate(Protos.TaskStatus status) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status sendFrameworkMessage(Protos.ExecutorID executorId, Protos.SlaveID slaveId, + byte[] data) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status reconcileTasks(Collection<Protos.TaskStatus> statuses) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status updateFramework(Protos.FrameworkInfo frameworkInfo, Collection<String> collection, + OfferConstraints offerConstraints) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status updateFramework(Protos.FrameworkInfo frameworkInfo, + Collection<String> collection) { + return null; + } + } +} diff --git a/pom.xml b/pom.xml index 7d96020..721cde1 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,7 @@ <module>modules/geospatial-ext</module> <module>modules/aop-ext</module> <module>modules/spark-ext</module> + <module>modules/mesos-ext</module> </modules> <profiles>