This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new cf10e70 [FLINK-12139][Mesos] Add disk space parameter. cf10e70 is described below commit cf10e70088645b0fcc54ab03ecec3ef372ddb652 Author: Juan Gentile <j.gent...@criteo.com> AuthorDate: Tue Apr 9 13:48:36 2019 +0200 [FLINK-12139][Mesos] Add disk space parameter. This closes #8224. --- .../generated/mesos_task_manager_configuration.html | 5 +++++ .../flink/mesos/entrypoint/MesosEntrypointUtils.java | 5 +++-- .../clusterframework/LaunchableMesosWorker.java | 6 +++++- .../clusterframework/MesosTaskManagerParameters.java | 19 +++++++++++++++++++ .../clusterframework/MesosResourceManagerTest.java | 2 +- 5 files changed, 33 insertions(+), 4 deletions(-) diff --git a/docs/_includes/generated/mesos_task_manager_configuration.html b/docs/_includes/generated/mesos_task_manager_configuration.html index 1e67f84..338acc6 100644 --- a/docs/_includes/generated/mesos_task_manager_configuration.html +++ b/docs/_includes/generated/mesos_task_manager_configuration.html @@ -48,6 +48,11 @@ <td>CPUs to assign to the Mesos workers.</td> </tr> <tr> + <td><h5>mesos.resourcemanager.tasks.disk</h5></td> + <td style="word-wrap: break-word;">0</td> + <td>Disk space to assign to the Mesos workers in MB.</td> + </tr> + <tr> <td><h5>mesos.resourcemanager.tasks.gpus</h5></td> <td style="word-wrap: break-word;">0</td> <td>GPUs to assign to the Mesos workers.</td> diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java index 2059c8e..4d7a485 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java @@ -109,12 +109,13 @@ public class MesosEntrypointUtils { log.info("TaskManagers will be created with {} task slots", taskManagerParameters.containeredParameters().numSlots()); log.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " + - "JVM direct memory limit {} MB, {} cpus, {} gpus", + "JVM direct memory limit {} MB, {} cpus, {} gpus, disk space {} MB", taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(), taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(), taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(), taskManagerParameters.cpus(), - taskManagerParameters.gpus()); + taskManagerParameters.gpus(), + taskManagerParameters.disk()); return taskManagerParameters; } diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java index 637442c..22f6a30 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java @@ -145,7 +145,7 @@ public class LaunchableMesosWorker implements LaunchableTask { @Override public double getDisk() { - return 0.0; + return params.disk(); } @Override @@ -221,6 +221,10 @@ public class LaunchableMesosWorker implements LaunchableTask { taskInfo.addAllResources(allocation.takeScalar("gpus", taskRequest.getGPUs(), roles)); taskInfo.addAllResources(allocation.takeScalar("mem", taskRequest.getMemory(), roles)); + if (taskRequest.getDisk() > 0.0) { + taskInfo.addAllResources(allocation.takeScalar("disk", taskRequest.getDisk(), roles)); + } + final Protos.CommandInfo.Builder cmd = taskInfo.getCommandBuilder(); final Protos.Environment.Builder env = cmd.getEnvironmentBuilder(); final StringBuilder jvmArgs = new StringBuilder(); diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java index 0315629..1d49000 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java @@ -59,6 +59,11 @@ public class MesosTaskManagerParameters { .defaultValue(1024) .withDescription("Memory to assign to the Mesos workers in MB."); + public static final ConfigOption<Integer> MESOS_RM_TASKS_DISK_MB = + key("mesos.resourcemanager.tasks.disk") + .defaultValue(0) + .withDescription(Description.builder().text("Disk space to assign to the Mesos workers in MB.").build()); + public static final ConfigOption<Double> MESOS_RM_TASKS_CPUS = key("mesos.resourcemanager.tasks.cpus") .defaultValue(0.0) @@ -145,6 +150,8 @@ public class MesosTaskManagerParameters { private final int gpus; + private final int disk; + private final ContainerType containerType; private final Option<String> containerImageName; @@ -170,6 +177,7 @@ public class MesosTaskManagerParameters { public MesosTaskManagerParameters( double cpus, int gpus, + int disk, ContainerType containerType, Option<String> containerImageName, ContaineredTaskManagerParameters containeredParameters, @@ -184,6 +192,7 @@ public class MesosTaskManagerParameters { this.cpus = cpus; this.gpus = gpus; + this.disk = disk; this.containerType = Preconditions.checkNotNull(containerType); this.containerImageName = Preconditions.checkNotNull(containerImageName); this.containeredParameters = Preconditions.checkNotNull(containeredParameters); @@ -212,6 +221,13 @@ public class MesosTaskManagerParameters { } /** + * Get the disk space in MB to use for the TaskManager Process. + */ + public int disk() { + return disk; + } + + /** * Get the container type (Mesos or Docker). The default is Mesos. * * <p>Mesos provides a facility for a framework to specify which containerizer to use. @@ -335,6 +351,8 @@ public class MesosTaskManagerParameters { " cannot be negative"); } + int disk = flinkConfig.getInteger(MESOS_RM_TASKS_DISK_MB); + // parse the containerization parameters String imageName = flinkConfig.getString(MESOS_RM_CONTAINER_IMAGE_NAME); @@ -379,6 +397,7 @@ public class MesosTaskManagerParameters { return new MesosTaskManagerParameters( cpus, gpus, + disk, containerType, Option.apply(imageName), containeredParameters, diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index c5d053c..617ad41 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -279,7 +279,7 @@ public class MesosResourceManagerTest extends TestLogger { ContaineredTaskManagerParameters containeredParams = new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>()); MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters( - 1.0, 1, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams, + 1.0, 1, 0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams, Collections.<Protos.Volume>emptyList(), Collections.<Protos.Parameter>emptyList(), false, Collections.<ConstraintEvaluator>emptyList(), "", Option.<String>empty(), Option.<String>empty(), Collections.<String>emptyList());