This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cf10e70  [FLINK-12139][Mesos] Add disk space parameter.
cf10e70 is described below

commit cf10e70088645b0fcc54ab03ecec3ef372ddb652
Author: Juan Gentile <j.gent...@criteo.com>
AuthorDate: Tue Apr 9 13:48:36 2019 +0200

    [FLINK-12139][Mesos] Add disk space parameter.
    
    This closes #8224.
---
 .../generated/mesos_task_manager_configuration.html   |  5 +++++
 .../flink/mesos/entrypoint/MesosEntrypointUtils.java  |  5 +++--
 .../clusterframework/LaunchableMesosWorker.java       |  6 +++++-
 .../clusterframework/MesosTaskManagerParameters.java  | 19 +++++++++++++++++++
 .../clusterframework/MesosResourceManagerTest.java    |  2 +-
 5 files changed, 33 insertions(+), 4 deletions(-)

diff --git a/docs/_includes/generated/mesos_task_manager_configuration.html 
b/docs/_includes/generated/mesos_task_manager_configuration.html
index 1e67f84..338acc6 100644
--- a/docs/_includes/generated/mesos_task_manager_configuration.html
+++ b/docs/_includes/generated/mesos_task_manager_configuration.html
@@ -48,6 +48,11 @@
             <td>CPUs to assign to the Mesos workers.</td>
         </tr>
         <tr>
+            <td><h5>mesos.resourcemanager.tasks.disk</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>Disk space to assign to the Mesos workers in MB.</td>
+        </tr>
+        <tr>
             <td><h5>mesos.resourcemanager.tasks.gpus</h5></td>
             <td style="word-wrap: break-word;">0</td>
             <td>GPUs to assign to the Mesos workers.</td>
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
index 2059c8e..4d7a485 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java
@@ -109,12 +109,13 @@ public class MesosEntrypointUtils {
                log.info("TaskManagers will be created with {} task slots",
                        
taskManagerParameters.containeredParameters().numSlots());
                log.info("TaskManagers will be started with container size {} 
MB, JVM heap size {} MB, " +
-                               "JVM direct memory limit {} MB, {} cpus, {} 
gpus",
+                               "JVM direct memory limit {} MB, {} cpus, {} 
gpus, disk space {} MB",
                        
taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
                        
taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
                        
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
                        taskManagerParameters.cpus(),
-                       taskManagerParameters.gpus());
+                       taskManagerParameters.gpus(),
+                       taskManagerParameters.disk());
 
                return taskManagerParameters;
        }
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 637442c..22f6a30 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -145,7 +145,7 @@ public class LaunchableMesosWorker implements 
LaunchableTask {
 
                @Override
                public double getDisk() {
-                       return 0.0;
+                       return params.disk();
                }
 
                @Override
@@ -221,6 +221,10 @@ public class LaunchableMesosWorker implements 
LaunchableTask {
                taskInfo.addAllResources(allocation.takeScalar("gpus", 
taskRequest.getGPUs(), roles));
                taskInfo.addAllResources(allocation.takeScalar("mem", 
taskRequest.getMemory(), roles));
 
+               if (taskRequest.getDisk() > 0.0) {
+                       taskInfo.addAllResources(allocation.takeScalar("disk", 
taskRequest.getDisk(), roles));
+               }
+
                final Protos.CommandInfo.Builder cmd = 
taskInfo.getCommandBuilder();
                final Protos.Environment.Builder env = 
cmd.getEnvironmentBuilder();
                final StringBuilder jvmArgs = new StringBuilder();
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 0315629..1d49000 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -59,6 +59,11 @@ public class MesosTaskManagerParameters {
                .defaultValue(1024)
                .withDescription("Memory to assign to the Mesos workers in 
MB.");
 
+       public static final ConfigOption<Integer> MESOS_RM_TASKS_DISK_MB =
+               key("mesos.resourcemanager.tasks.disk")
+               .defaultValue(0)
+               .withDescription(Description.builder().text("Disk space to 
assign to the Mesos workers in MB.").build());
+
        public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS =
                key("mesos.resourcemanager.tasks.cpus")
                .defaultValue(0.0)
@@ -145,6 +150,8 @@ public class MesosTaskManagerParameters {
 
        private final int gpus;
 
+       private final int disk;
+
        private final ContainerType containerType;
 
        private final Option<String> containerImageName;
@@ -170,6 +177,7 @@ public class MesosTaskManagerParameters {
        public MesosTaskManagerParameters(
                        double cpus,
                        int gpus,
+                       int disk,
                        ContainerType containerType,
                        Option<String> containerImageName,
                        ContaineredTaskManagerParameters containeredParameters,
@@ -184,6 +192,7 @@ public class MesosTaskManagerParameters {
 
                this.cpus = cpus;
                this.gpus = gpus;
+               this.disk = disk;
                this.containerType = Preconditions.checkNotNull(containerType);
                this.containerImageName = 
Preconditions.checkNotNull(containerImageName);
                this.containeredParameters = 
Preconditions.checkNotNull(containeredParameters);
@@ -212,6 +221,13 @@ public class MesosTaskManagerParameters {
        }
 
        /**
+        * Get the disk space in MB to use for the TaskManager Process.
+        */
+       public int disk() {
+               return disk;
+       }
+
+       /**
         * Get the container type (Mesos or Docker).  The default is Mesos.
         *
         * <p>Mesos provides a facility for a framework to specify which 
containerizer to use.
@@ -335,6 +351,8 @@ public class MesosTaskManagerParameters {
                                " cannot be negative");
                }
 
+               int disk = flinkConfig.getInteger(MESOS_RM_TASKS_DISK_MB);
+
                // parse the containerization parameters
                String imageName = 
flinkConfig.getString(MESOS_RM_CONTAINER_IMAGE_NAME);
 
@@ -379,6 +397,7 @@ public class MesosTaskManagerParameters {
                return new MesosTaskManagerParameters(
                        cpus,
                        gpus,
+                       disk,
                        containerType,
                        Option.apply(imageName),
                        containeredParameters,
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index c5d053c..617ad41 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -279,7 +279,7 @@ public class MesosResourceManagerTest extends TestLogger {
                        ContaineredTaskManagerParameters containeredParams =
                                new ContaineredTaskManagerParameters(1024, 768, 
256, 4, new HashMap<String, String>());
                        MesosTaskManagerParameters tmParams = new 
MesosTaskManagerParameters(
-                               1.0, 1, 
MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), 
containeredParams,
+                               1.0, 1, 0, 
MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), 
containeredParams,
                                Collections.<Protos.Volume>emptyList(), 
Collections.<Protos.Parameter>emptyList(), false,
                                Collections.<ConstraintEvaluator>emptyList(), 
"", Option.<String>empty(),
                                Option.<String>empty(), 
Collections.<String>emptyList());

Reply via email to