This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1f24495c0df56473c850605d9e34baf2f7464be1 Author: Andrey Zagrebin <azagre...@apache.org> AuthorDate: Wed Nov 6 09:59:32 2019 +0100 Fix yarn cut off --- .../client/deployment/ClusterSpecification.java | 6 +- .../clusterframework/TaskExecutorResourceSpec.java | 4 + .../TaskExecutorResourceUtils.java | 4 +- .../ActiveResourceManagerFactory.java | 23 +--- .../ActiveResourceManagerFactoryTest.java | 97 -------------- .../flink/yarn/CliFrontendRunWithYarnTest.java | 3 +- .../apache/flink/yarn/YarnConfigurationITCase.java | 28 +--- .../flink/yarn/YarnClusterClientFactory.java | 6 +- .../apache/flink/yarn/YarnClusterDescriptor.java | 18 +-- .../apache/flink/yarn/cli/FlinkYarnSessionCli.java | 2 +- .../apache/flink/yarn/FlinkYarnSessionCliTest.java | 142 +++++++-------------- 11 files changed, 73 insertions(+), 260 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java index 72975d8..0d8d105 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java @@ -21,6 +21,7 @@ package org.apache.flink.client.deployment; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; /** * Description of the cluster to start by the {@link ClusterDescriptor}. @@ -68,7 +69,10 @@ public final class ClusterSpecification { int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); int jobManagerMemoryMb = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes(); - int taskManagerMemoryMb = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes(); + int taskManagerMemoryMb = TaskExecutorResourceUtils + .resourceSpecFromConfig(configuration) + .getTotalProcessMemorySize() + .getMebiBytes(); return new ClusterSpecificationBuilder() .setMasterMemoryMB(jobManagerMemoryMb) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java index d6cbe5b..d73e7b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java @@ -156,6 +156,10 @@ public class TaskExecutorResourceSpec implements java.io.Serializable { return getTotalFlinkMemorySize().add(jvmMetaspaceSize).add(jvmOverheadSize); } + public MemorySize getHeapSize() { + return frameworkHeapSize.add(taskHeapSize).add(onHeapManagedMemorySize); + } + @Override public String toString() { return "TaskExecutorResourceSpec {" diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java index 4b649e4..9c69b62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java @@ -46,9 +46,7 @@ public class TaskExecutorResourceUtils { // ------------------------------------------------------------------------ public static String generateJvmParametersStr(final TaskExecutorResourceSpec taskExecutorResourceSpec) { - final MemorySize jvmHeapSize = taskExecutorResourceSpec.getFrameworkHeapSize() - .add(taskExecutorResourceSpec.getTaskHeapSize()) - .add(taskExecutorResourceSpec.getOnHeapManagedMemorySize()); + final MemorySize jvmHeapSize = taskExecutorResourceSpec.getHeapSize(); final MemorySize jvmDirectSize = taskExecutorResourceSpec.getTaskOffHeapSize() .add(taskExecutorResourceSpec.getShuffleMemSize()); final MemorySize jvmMetaspaceSize = taskExecutorResourceSpec.getJvmMetaspaceSize(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java index d292e5a..7444e23 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java @@ -19,9 +19,6 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ConfigurationUtils; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; import org.apache.flink.runtime.entrypoint.ClusterInformation; @@ -30,17 +27,11 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.taskexecutor.TaskManagerServices; import javax.annotation.Nullable; /** * Resource manager factory which creates active {@link ResourceManager} implementations. - * - * <p>The default implementation will call {@link #createActiveResourceManagerConfiguration} - * to create a new configuration which is configured with active resource manager relevant - * configuration options. - * * @param <T> type of the {@link ResourceIDRetrievable} */ public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievable> implements ResourceManagerFactory<T> { @@ -57,7 +48,7 @@ public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievab @Nullable String webInterfaceUrl, ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception { return createActiveResourceManager( - createActiveResourceManagerConfiguration(configuration), + configuration, resourceId, rpcService, highAvailabilityServices, @@ -68,18 +59,6 @@ public abstract class ActiveResourceManagerFactory<T extends ResourceIDRetrievab resourceManagerMetricGroup); } - public static Configuration createActiveResourceManagerConfiguration(Configuration originalConfiguration) { - final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(originalConfiguration).getMebiBytes(); - final long cutoffMB = ContaineredTaskManagerParameters.calculateCutoffMB(originalConfiguration, taskManagerMemoryMB); - final long processMemoryBytes = (taskManagerMemoryMB - cutoffMB) << 20; // megabytes to bytes - final long managedMemoryBytes = TaskManagerServices.getManagedMemoryFromProcessMemory(originalConfiguration, processMemoryBytes); - - final Configuration resourceManagerConfig = new Configuration(originalConfiguration); - resourceManagerConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, managedMemoryBytes + "b"); - - return resourceManagerConfig; - } - protected abstract ResourceManager<T> createActiveResourceManager( Configuration configuration, ResourceID resourceId, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java deleted file mode 100644 index ff61e65..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactoryTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.entrypoint.ClusterInformation; -import org.apache.flink.runtime.heartbeat.HeartbeatServices; -import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; -import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.rpc.FatalErrorHandler; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.RpcUtils; -import org.apache.flink.runtime.rpc.TestingRpcService; -import org.apache.flink.runtime.util.TestingFatalErrorHandler; -import org.apache.flink.util.TestLogger; - -import org.junit.Test; - -import javax.annotation.Nullable; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - -/** - * Tests for the {@link ActiveResourceManagerFactory}. - */ -public class ActiveResourceManagerFactoryTest extends TestLogger { - - /** - * Test which ensures that the {@link ActiveResourceManagerFactory} sets the correct managed - * memory when creating a resource manager. - */ - @Test - public void createResourceManager_WithDefaultConfiguration_ShouldSetManagedMemory() throws Exception { - final Configuration configuration = new Configuration(); - - final TestingActiveResourceManagerFactory resourceManagerFactory = new TestingActiveResourceManagerFactory(); - - final TestingRpcService rpcService = new TestingRpcService(); - - try { - final ResourceManager<ResourceID> ignored = resourceManagerFactory.createResourceManager( - configuration, - ResourceID.generate(), - rpcService, - new TestingHighAvailabilityServices(), - new TestingHeartbeatServices(), - new TestingFatalErrorHandler(), - new ClusterInformation("foobar", 1234), - null, - UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup()); - } finally { - RpcUtils.terminateRpcService(rpcService, Time.seconds(10L)); - } - } - - private static final class TestingActiveResourceManagerFactory extends ActiveResourceManagerFactory<ResourceID> { - - @Override - protected ResourceManager<ResourceID> createActiveResourceManager( - Configuration configuration, - ResourceID resourceId, - RpcService rpcService, - HighAvailabilityServices highAvailabilityServices, - HeartbeatServices heartbeatServices, - FatalErrorHandler fatalErrorHandler, - ClusterInformation clusterInformation, - @Nullable String webInterfaceUrl, - ResourceManagerMetricGroup resourceManagerMetricGroup) { - assertThat(configuration.contains(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE), is(true)); - - return null; - } - } -} diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java index 4e7ece3..4c83115 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java @@ -38,6 +38,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend; +import static org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.adjustMemoryConfigurationForLocalExecution; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath; @@ -66,7 +67,7 @@ public class CliFrontendRunWithYarnTest extends CliFrontendTestBase { public void testRun() throws Exception { String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath(); - Configuration configuration = new Configuration(); + Configuration configuration = adjustMemoryConfigurationForLocalExecution(new Configuration()); configuration.setString(JobManagerOptions.ADDRESS, "localhost"); configuration.setInteger(JobManagerOptions.PORT, 8081); diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java index dbad597..36f90e4 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java @@ -24,13 +24,11 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.ResourceManagerOptions; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.resourcemanager.ActiveResourceManagerFactory; import org.apache.flink.runtime.rest.RestClient; import org.apache.flink.runtime.rest.RestClientConfiguration; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; @@ -84,8 +82,11 @@ public class YarnConfigurationITCase extends YarnTestBase { final YarnClient yarnClient = getYarnClient(); final Configuration configuration = new Configuration(flinkConfiguration); + final TaskExecutorResourceSpec spec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration); final int masterMemory = 64; - final int taskManagerMemory = 512; + final int taskManagerMemory = spec.getTotalProcessMemorySize().getMebiBytes(); + final long expectedHeapBytes = spec.getHeapSize().getBytes(); + final int expectedManagedMemoryMB = spec.getManagedMemorySize().getMebiBytes(); final int slotsPerTaskManager = 3; // disable heap cutoff min @@ -175,23 +176,13 @@ public class YarnConfigurationITCase extends YarnTestBase { assertThat(taskManagerInfo.getNumberSlots(), is(slotsPerTaskManager)); - final ContaineredTaskManagerParameters containeredTaskManagerParameters = ContaineredTaskManagerParameters.create( - configuration, - null, - taskManagerMemory, - slotsPerTaskManager); - - final long expectedHeadSize = containeredTaskManagerParameters.taskManagerHeapSizeMB() << 20L; - // We compare here physical memory assigned to a container with the heap memory that we should pass to // jvm as Xmx parameter. Those value might differ significantly due to system page size or jvm // implementation therefore we use 15% threshold here. assertThat( - (double) taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / (double) expectedHeadSize, + (double) taskManagerInfo.getHardwareDescription().getSizeOfJvmHeap() / expectedHeapBytes, is(closeTo(1.0, 0.15))); - final int expectedManagedMemoryMB = calculateManagedMemorySizeMB(configuration); - assertThat((int) (taskManagerInfo.getHardwareDescription().getSizeOfManagedMemory() >> 20), is(expectedManagedMemoryMB)); } finally { restClient.shutdown(TIMEOUT); @@ -214,9 +205,4 @@ public class YarnConfigurationITCase extends YarnTestBase { return taskManagerInfo.getNumberSlots() > 0; } } - - private static int calculateManagedMemorySizeMB(Configuration configuration) { - Configuration resourceManagerConfig = ActiveResourceManagerFactory.createActiveResourceManagerConfiguration(configuration); - return MemorySize.parse(resourceManagerConfig.getString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE)).getMebiBytes(); - } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java index aa138a7..6ea7d63 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -70,7 +71,10 @@ public class YarnClusterClientFactory implements ClusterClientFactory<Applicatio final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes(); // Task Managers memory - final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes(); + final int taskManagerMemoryMB = TaskExecutorResourceUtils + .resourceSpecFromConfig(configuration) + .getTotalProcessMemorySize() + .getMebiBytes(); int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 74207ff..14005f4 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -40,11 +40,10 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.plugin.PluginConfig; import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; -import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.runtime.taskexecutor.TaskManagerServices; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ShutdownHookUtil; @@ -458,16 +457,11 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { * @throws FlinkException if the cluster cannot be started with the provided {@link ClusterSpecification} */ private void validateClusterSpecification(ClusterSpecification clusterSpecification) throws FlinkException { + flinkConfiguration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, clusterSpecification.getTaskManagerMemoryMB() + "m"); try { - final long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB(); - // We do the validation by calling the calculation methods here - // Internally these methods will check whether the cluster can be started with the provided - // ClusterSpecification and the configured memory requirements - final long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize); - TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration); + TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfiguration); } catch (IllegalArgumentException iae) { - throw new FlinkException("Cannot fulfill the minimum memory requirements with the provided " + - "cluster specification. Please increase the memory of the cluster.", iae); + throw new FlinkException("Inconsistent cluster specification.", iae); } } @@ -834,10 +828,6 @@ public class YarnClusterDescriptor implements ClusterDescriptor<ApplicationId> { TaskManagerOptions.NUM_TASK_SLOTS, clusterSpecification.getSlotsPerTaskManager()); - configuration.setString( - TaskManagerOptions.TOTAL_PROCESS_MEMORY, - clusterSpecification.getTaskManagerMemoryMB() + "m"); - // Upload the flink configuration // write out configuration file File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 0efa610..0e7df5b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -374,7 +374,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine { if (!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) { tmMemoryVal += "m"; } - effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, tmMemoryVal); + effectiveConfiguration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, tmMemoryVal); } if (commandLine.hasOption(slots.getOpt())) { diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index a357126..9376533 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -107,11 +107,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { String[] params = new String[] {"-ys", "3"}; - FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli( - new Configuration(), - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCliWithTmTotalMemory(1024); final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true); @@ -129,11 +125,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { String[] params = new String[] {"-yd"}; - FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli( - new Configuration(), - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli(); final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true); @@ -151,11 +143,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { String[] params = new String[] {"-yz", zkNamespaceCliInput}; - FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli( - new Configuration(), - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli(); CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true); @@ -172,11 +160,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { String[] params = new String[] {"-ynl", nodeLabelCliInput }; - FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli( - new Configuration(), - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + FlinkYarnSessionCli yarnCLI = createFlinkYarnSessionCli(); CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true); @@ -198,11 +182,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { final Configuration configuration = new Configuration(); configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath()); - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( - configuration, - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true); @@ -225,21 +205,12 @@ public class FlinkYarnSessionCliTest extends TestLogger { final Configuration configuration = new Configuration(); configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath()); - new FlinkYarnSessionCli( - configuration, - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + createFlinkYarnSessionCli(configuration); } @Test public void testResumeFromYarnID() throws Exception { - final Configuration configuration = new Configuration(); - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( - configuration, - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true); @@ -252,12 +223,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { @Test public void testResumeFromYarnIDZookeeperNamespace() throws Exception { - final Configuration configuration = new Configuration(); - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( - configuration, - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true); @@ -273,12 +239,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { @Test public void testResumeFromYarnIDZookeeperNamespaceOverride() throws Exception { - final Configuration configuration = new Configuration(); - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( - configuration, - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(); final String overrideZkNamespace = "my_cluster"; @@ -300,11 +261,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { final Configuration configuration = new Configuration(); configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath()); - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( - configuration, - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true); final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine); @@ -326,15 +283,11 @@ public class FlinkYarnSessionCliTest extends TestLogger { final int slotsPerTaskManager = 30; configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m"); - configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, taskManagerMemory + "m"); + configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, taskManagerMemory + "m"); configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager); final String[] args = {"-yjm", String.valueOf(jobManagerMemory) + "m", "-ytm", String.valueOf(taskManagerMemory) + "m", "-ys", String.valueOf(slotsPerTaskManager)}; - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( - configuration, - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration); CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false); @@ -357,16 +310,12 @@ public class FlinkYarnSessionCliTest extends TestLogger { final int jobManagerMemory = 1337; configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m"); final int taskManagerMemory = 7331; - configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, taskManagerMemory + "m"); + configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, taskManagerMemory + "m"); final int slotsPerTaskManager = 42; configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager); final String[] args = {}; - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( - configuration, - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration); CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false); @@ -385,11 +334,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { @Test public void testHeapMemoryPropertyWithoutUnit() throws Exception { final String[] args = new String[] { "-yjm", "1024", "-ytm", "2048" }; - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( - new Configuration(), - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false); @@ -407,11 +352,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { @Test public void testHeapMemoryPropertyWithUnitMB() throws Exception { final String[] args = new String[] { "-yjm", "1024m", "-ytm", "2048m" }; - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( - new Configuration(), - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false); final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine); @@ -428,11 +369,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { @Test public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception { final String[] args = new String[] { "-yjm", "1g", "-ytm", "2g" }; - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( - new Configuration(), - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false); final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine); @@ -452,11 +389,7 @@ public class FlinkYarnSessionCliTest extends TestLogger { configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048); configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096); - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( - configuration, - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false); @@ -469,15 +402,12 @@ public class FlinkYarnSessionCliTest extends TestLogger { } /** - * Tests the specifying heap memory with config default value for job manager and task manager. + * Tests the specifying job manager heap memory with config default value for job manager and task manager. */ @Test - public void testHeapMemoryPropertyWithConfigDefaultValue() throws Exception { - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( - new Configuration(), - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + public void testJobManagerHeapMemoryPropertyWithConfigDefaultValue() throws Exception { + int totalMemomory = 1024; + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCliWithTmTotalMemory(totalMemomory); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false); @@ -485,18 +415,14 @@ public class FlinkYarnSessionCliTest extends TestLogger { final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig); final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(executorConfig); - assertThat(clusterSpecification.getMasterMemoryMB(), is(1024)); - assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(1024)); + assertThat(clusterSpecification.getMasterMemoryMB(), is(totalMemomory)); + assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(totalMemomory)); } @Test public void testMultipleYarnShipOptions() throws Exception { final String[] args = new String[]{"run", "--yarnship", tmp.newFolder().getAbsolutePath(), "--yarnship", tmp.newFolder().getAbsolutePath()}; - final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli( - new Configuration(), - tmp.getRoot().getAbsolutePath(), - "y", - "yarn"); + final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(); final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false); @@ -527,4 +453,22 @@ public class FlinkYarnSessionCliTest extends TestLogger { return tmpFolder.getAbsoluteFile(); } + + private FlinkYarnSessionCli createFlinkYarnSessionCli() throws FlinkException { + return createFlinkYarnSessionCli(new Configuration()); + } + + private FlinkYarnSessionCli createFlinkYarnSessionCliWithTmTotalMemory(int totalMemomory) throws FlinkException { + Configuration configuration = new Configuration(); + configuration.setString(TaskManagerOptions.TOTAL_PROCESS_MEMORY, totalMemomory + "m"); + return createFlinkYarnSessionCli(configuration); + } + + private FlinkYarnSessionCli createFlinkYarnSessionCli(Configuration configuration) throws FlinkException { + return new FlinkYarnSessionCli( + configuration, + tmp.getRoot().getAbsolutePath(), + "y", + "yarn"); + } }