[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: URL: https://github.com/apache/flink/pull/11353#discussion_r411422637 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java ## @@ -96,7 +110,40 @@ } @VisibleForTesting - Method getMethod() { - return method; + Method getGetContainersFromPreviousAttemptsMethod() { + return getContainersFromPreviousAttemptsMethod; + } + + /** +* Get names of resource types that are considered by the Yarn scheduler. +* @param response The response object from the registration at the ResourceManager. +* @return A set of resource type names, or {@link Optional#empty()} if the Yarn version does not support this API. +*/ + Optional> getSchedulerResourceTypeNames(final RegisterApplicationMasterResponse response) { + return getSchedulerResourceTypeNamesUnsafe(response); + } + + @VisibleForTesting + Optional> getSchedulerResourceTypeNamesUnsafe(final Object response) { Review comment: Won't do because of testing purposes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: URL: https://github.com/apache/flink/pull/11353#discussion_r411421781 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.TriConsumer; +import org.apache.flink.util.function.TriFunction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A Yarn {@link AMRMClientAsync} implementation for testing. + */ +public class TestingYarnAMRMClientAsync extends AMRMClientAsyncImpl { + + private Function, List>> + getMatchingRequestsFunction = ignored -> Collections.emptyList(); + private BiConsumer addContainerRequestConsumer = (ignored1, ignored2) -> {}; + private BiConsumer removeContainerRequestConsumer = (ignored1, ignored2) -> {}; + private BiConsumer releaseAssignedContainerConsumer = (ignored1, ignored2) -> {}; + private Consumer setHeartbeatIntervalConsumer = (ignored) -> {}; + private TriFunction registerApplicationMasterFunction = + (ignored1, ignored2, ignored3) -> RegisterApplicationMasterResponse.newInstance( + Resource.newInstance(0, 0), + Resource.newInstance(Integer.MAX_VALUE, Integer.MAX_VALUE), + Collections.emptyMap(), + null, + Collections.emptyList(), + null, + Collections.emptyList()); + private TriConsumer unregisterApplicationMasterConsumer = (ignored1, ignored2, ignored3) -> {}; + + TestingYarnAMRMClientAsync(CallbackHandler callbackHandler) { + super(0, callbackHandler); + } + + @Override + public List> getMatchingRequests(Priority priority, String resourceName, Resource capability) { + return getMatchingRequestsFunction.apply(Tuple4.of(priority, resourceName, capability, handler)); + } + + @Override + public void addContainerRequest(AMRMClient.ContainerRequest req) { + addContainerRequestConsumer.accept(req, handler); + } + + @Override + public void removeContainerRequest(AMRMClient.ContainerRequest req) { + removeContainerRequestConsumer.accept(req, handler); + } + + @Override + public void releaseAssignedContainer(ContainerId containerId) { + releaseAssignedContainerConsumer.accept(containerId, handler); + } + + @Override + public void setHeartbeatInterval(int interval) { + setHeartbeatIntervalConsumer.accept(interval); + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) { + return registerApplicationMasterFunction.apply(appHostName, appHostPort, appTrackingUrl); + } + + @Override + public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) { + unregisterApplicationMasterConsumer.accept(appStatus, appMessage, appTrackingUrl); + } + + void setGetMatchingRequestsFunction( + Function, List>> +
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: URL: https://github.com/apache/flink/pull/11353#discussion_r411413787 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ## @@ -243,24 +248,33 @@ protected NMClientAsync createAndStartNodeManagerClient(YarnConfiguration yarnCo // domain objects for test purposes final ResourceProfile resourceProfile1 = ResourceProfile.UNKNOWN; + final WorkerResourceSpec workerResourceSpec; + + final Resource containerResource; public String taskHost = "host1"; final TestingYarnNMClientAsync testingYarnNMClientAsync; final TestingYarnAMRMClientAsync testingYarnAMRMClientAsync; + int containerIdx = 0; + /** * Create mock RM dependencies. */ Context() throws Exception { - this(flinkConfig); + this(flinkConfig, null); } - Context(Configuration configuration) throws Exception { - final SlotManager slotManager = SlotManagerBuilder.newBuilder() - .setDefaultWorkerResourceSpec(YarnWorkerResourceSpecFactory.INSTANCE.createDefaultWorkerResourceSpec(configuration)) - .build(); + Context(Configuration configuration, @Nullable SlotManager slotManager) throws Exception { + + workerResourceSpec = YarnWorkerResourceSpecFactory.INSTANCE.createDefaultWorkerResourceSpec(configuration); + if (slotManager == null) { + slotManager = SlotManagerBuilder.newBuilder() + .setDefaultWorkerResourceSpec(workerResourceSpec) + .build(); + } Review comment: I think it would be fine to add a `SlotManager.getDefaultWorkerResourceSpec()`. This could solve the problem here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: URL: https://github.com/apache/flink/pull/11353#discussion_r411421064 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflectorTest.java ## @@ -88,7 +94,44 @@ public void testGetMethodReflectiveHadoop22() { final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector = new RegisterApplicationMasterResponseReflector(LOG); - final Method method = registerApplicationMasterResponseReflector.getMethod(); + final Method method = registerApplicationMasterResponseReflector.getGetContainersFromPreviousAttemptsMethod(); + assertThat(method, notNullValue()); + } + + @Test + public void testCallsGetSchedulerResourceTypesMethodIfPresent() { + final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector = + new RegisterApplicationMasterResponseReflector(LOG, HasMethod.class); + + final Optional> schedulerResourceTypeNames = + registerApplicationMasterResponseReflector.getSchedulerResourceTypeNamesUnsafe(new HasMethod()); Review comment: Makes sense. I agree that your proposal is better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: URL: https://github.com/apache/flink/pull/11353#discussion_r411413787 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ## @@ -243,24 +248,33 @@ protected NMClientAsync createAndStartNodeManagerClient(YarnConfiguration yarnCo // domain objects for test purposes final ResourceProfile resourceProfile1 = ResourceProfile.UNKNOWN; + final WorkerResourceSpec workerResourceSpec; + + final Resource containerResource; public String taskHost = "host1"; final TestingYarnNMClientAsync testingYarnNMClientAsync; final TestingYarnAMRMClientAsync testingYarnAMRMClientAsync; + int containerIdx = 0; + /** * Create mock RM dependencies. */ Context() throws Exception { - this(flinkConfig); + this(flinkConfig, null); } - Context(Configuration configuration) throws Exception { - final SlotManager slotManager = SlotManagerBuilder.newBuilder() - .setDefaultWorkerResourceSpec(YarnWorkerResourceSpecFactory.INSTANCE.createDefaultWorkerResourceSpec(configuration)) - .build(); + Context(Configuration configuration, @Nullable SlotManager slotManager) throws Exception { + + workerResourceSpec = YarnWorkerResourceSpecFactory.INSTANCE.createDefaultWorkerResourceSpec(configuration); + if (slotManager == null) { + slotManager = SlotManagerBuilder.newBuilder() + .setDefaultWorkerResourceSpec(workerResourceSpec) + .build(); + } Review comment: I think it would be fine to add a `SlotManager.getDefaultWorkerResourceSpec()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409595126 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.junit.Test; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link WorkerSpecContainerResourceAdapter}. + */ +public class WorkerSpecContainerResourceAdapterTest extends TestLogger { + + @Test + public void testMatchVcores() { + final int minMemMB = 100; + final int minVcore = 10; + final WorkerSpecContainerResourceAdapter adapter = + new WorkerSpecContainerResourceAdapter( + getConfigProcessSpecEqualsWorkerSpec(), + minMemMB, + minVcore, + Integer.MAX_VALUE, + Integer.MAX_VALUE, + WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE); + + final WorkerResourceSpec workerSpec1 = new WorkerResourceSpec.Builder() + .setCpuCores(1.0) + .setTaskHeapMemoryMB(10) + .setTaskOffHeapMemoryMB(10) + .setNetworkMemoryMB(10) + .setManagedMemoryMB(10) + .build(); + final WorkerResourceSpec workerSpec2 = new WorkerResourceSpec.Builder() + .setCpuCores(10.0) + .setTaskHeapMemoryMB(25) + .setTaskOffHeapMemoryMB(25) + .setNetworkMemoryMB(25) + .setManagedMemoryMB(25) + .build(); + final WorkerResourceSpec workerSpec3 = new WorkerResourceSpec.Builder() + .setCpuCores(5.0) + .setTaskHeapMemoryMB(30) + .setTaskOffHeapMemoryMB(30) + .setNetworkMemoryMB(30) + .setManagedMemoryMB(30) + .build(); + final WorkerResourceSpec workerSpec4 = new WorkerResourceSpec.Builder() + .setCpuCores(15.0) + .setTaskHeapMemoryMB(10) + .setTaskOffHeapMemoryMB(10) + .setNetworkMemoryMB(10) + .setManagedMemoryMB(10) + .build(); + + final Resource containerResource1 = Resource.newInstance(100, 10); + final Resource containerResource2 = Resource.newInstance(200, 10); + final Resource containerResource3 = Resource.newInstance(100, 20); + + assertThat(adapter.getWorkerSpecs(containerResource1), empty()); + assertThat(adapter.getWorkerSpecs(containerResource2), empty()); + + assertThat(adapter.getContainerResource(workerSpec1).get(), is(containerResource1)); + assertThat(adapter.getContainerResource(workerSpec2).get(), is(containerResource1)); + assertThat(adapter.getContainerResource(workerSpec3).get(), is(containerResource2)); + assertThat(adapter.getContainerResource(workerSpec4).get(), is(containerResource3)); + +
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409579792 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; +import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. + */ +public class WorkerSpecContainerResourceAdapter { + private final Logger log = LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class); Review comment: ```suggestion private static final Logger LOG = LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409575963 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.TriConsumer; +import org.apache.flink.util.function.TriFunction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A Yarn {@link AMRMClientAsync} implementation for testing. + */ +public class TestingYarnAMRMClientAsync extends AMRMClientAsyncImpl { + + private Function, List>> + getMatchingRequestsFunction = ignored -> Collections.emptyList(); + private BiConsumer addContainerRequestConsumer = (ignored1, ignored2) -> {}; + private BiConsumer removeContainerRequestConsumer = (ignored1, ignored2) -> {}; + private BiConsumer releaseAssignedContainerConsumer = (ignored1, ignored2) -> {}; + private Consumer setHeartbeatIntervalConsumer = (ignored) -> {}; + private TriFunction registerApplicationMasterFunction = + (ignored1, ignored2, ignored3) -> RegisterApplicationMasterResponse.newInstance( + Resource.newInstance(0, 0), + Resource.newInstance(Integer.MAX_VALUE, Integer.MAX_VALUE), + Collections.emptyMap(), + null, + Collections.emptyList(), + null, + Collections.emptyList()); + private TriConsumer unregisterApplicationMasterConsumer = (ignored1, ignored2, ignored3) -> {}; + + TestingYarnAMRMClientAsync(CallbackHandler callbackHandler) { + super(0, callbackHandler); + } + + @Override + public List> getMatchingRequests(Priority priority, String resourceName, Resource capability) { + return getMatchingRequestsFunction.apply(Tuple4.of(priority, resourceName, capability, handler)); + } + + @Override + public void addContainerRequest(AMRMClient.ContainerRequest req) { + addContainerRequestConsumer.accept(req, handler); + } + + @Override + public void removeContainerRequest(AMRMClient.ContainerRequest req) { + removeContainerRequestConsumer.accept(req, handler); + } + + @Override + public void releaseAssignedContainer(ContainerId containerId) { + releaseAssignedContainerConsumer.accept(containerId, handler); + } + + @Override + public void setHeartbeatInterval(int interval) { + setHeartbeatIntervalConsumer.accept(interval); + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) { + return registerApplicationMasterFunction.apply(appHostName, appHostPort, appTrackingUrl); + } + + @Override + public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) { + unregisterApplicationMasterConsumer.accept(appStatus, appMessage, appTrackingUrl); + }
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409624390 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -117,7 +118,9 @@ /** Client to communicate with the Node manager and launch TaskExecutor processes. */ private NMClientAsync nodeManagerClient; - private final WorkerSpecContainerResourceAdapter workerSpecContainerResourceAdapter; + private WorkerSpecContainerResourceAdapter workerSpecContainerResourceAdapter = null; Review comment: Either annotate with `@Nullable` and add the corresponding null checks or initialize with a mock `WorkerSpecContainerResourceAdapter` which fails on every call with an exception saying that the `RM` has not been properly initialized. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409596634 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; +import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. + */ +public class WorkerSpecContainerResourceAdapter { + private final Logger log = LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class); + + private final Configuration flinkConfig; + private final int minMemMB; + private final int minVcore; + private final int maxMemMB; + private final int maxVcore; + private final WorkerSpecContainerResourceAdapter.MatchingStrategy matchingStrategy; + private final Map workerSpecToContainerResource; + private final Map> containerResourceToWorkerSpecs; Review comment: Can the value type be a set? ```suggestion private final Map> containerResourceToWorkerSpecs; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409572279 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.TriConsumer; +import org.apache.flink.util.function.TriFunction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A Yarn {@link AMRMClientAsync} implementation for testing. + */ +public class TestingYarnAMRMClientAsync extends AMRMClientAsyncImpl { + + private Function, List>> + getMatchingRequestsFunction = ignored -> Collections.emptyList(); + private BiConsumer addContainerRequestConsumer = (ignored1, ignored2) -> {}; + private BiConsumer removeContainerRequestConsumer = (ignored1, ignored2) -> {}; + private BiConsumer releaseAssignedContainerConsumer = (ignored1, ignored2) -> {}; + private Consumer setHeartbeatIntervalConsumer = (ignored) -> {}; + private TriFunction registerApplicationMasterFunction = + (ignored1, ignored2, ignored3) -> RegisterApplicationMasterResponse.newInstance( + Resource.newInstance(0, 0), + Resource.newInstance(Integer.MAX_VALUE, Integer.MAX_VALUE), + Collections.emptyMap(), + null, + Collections.emptyList(), + null, + Collections.emptyList()); + private TriConsumer unregisterApplicationMasterConsumer = (ignored1, ignored2, ignored3) -> {}; + + TestingYarnAMRMClientAsync(CallbackHandler callbackHandler) { + super(0, callbackHandler); + } + + @Override + public List> getMatchingRequests(Priority priority, String resourceName, Resource capability) { + return getMatchingRequestsFunction.apply(Tuple4.of(priority, resourceName, capability, handler)); + } + + @Override + public void addContainerRequest(AMRMClient.ContainerRequest req) { + addContainerRequestConsumer.accept(req, handler); + } + + @Override + public void removeContainerRequest(AMRMClient.ContainerRequest req) { + removeContainerRequestConsumer.accept(req, handler); + } + + @Override + public void releaseAssignedContainer(ContainerId containerId) { + releaseAssignedContainerConsumer.accept(containerId, handler); + } + + @Override + public void setHeartbeatInterval(int interval) { + setHeartbeatIntervalConsumer.accept(interval); + } + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) { + return registerApplicationMasterFunction.apply(appHostName, appHostPort, appTrackingUrl); + } + + @Override + public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) { + unregisterApplicationMasterConsumer.accept(appStatus, appMessage, appTrackingUrl); + }
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409583691 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; +import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. + */ +public class WorkerSpecContainerResourceAdapter { + private final Logger log = LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class); + + private final Configuration flinkConfig; + private final int minMemMB; + private final int minVcore; + private final int maxMemMB; + private final int maxVcore; + private final WorkerSpecContainerResourceAdapter.MatchingStrategy matchingStrategy; + private final Map workerSpecToContainerResource; + private final Map> containerResourceToWorkerSpecs; + private final Map> containerMemoryToContainerResource; + + @VisibleForTesting Review comment: I think we don't need the `@VisibleForTesting` annotations in this class because we did not increase the visibility of these methods for testing purposes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409581744 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; +import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. + */ +public class WorkerSpecContainerResourceAdapter { + private final Logger log = LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class); + + private final Configuration flinkConfig; + private final int minMemMB; + private final int minVcore; + private final int maxMemMB; + private final int maxVcore; + private final WorkerSpecContainerResourceAdapter.MatchingStrategy matchingStrategy; + private final Map workerSpecToContainerResource; + private final Map> containerResourceToWorkerSpecs; + private final Map> containerMemoryToContainerResource; + + @VisibleForTesting + WorkerSpecContainerResourceAdapter( + final Configuration flinkConfig, + final int minMemMB, + final int minVcore, + final int maxMemMB, + final int maxVcore, + final WorkerSpecContainerResourceAdapter.MatchingStrategy matchingStrategy) { + this.flinkConfig = Preconditions.checkNotNull(flinkConfig); + this.minMemMB = minMemMB; + this.minVcore = minVcore; + this.maxMemMB = maxMemMB; + this.maxVcore = maxVcore; + this.matchingStrategy = matchingStrategy; + workerSpecToContainerResource = new HashMap<>(); + containerResourceToWorkerSpecs = new HashMap<>(); + containerMemoryToContainerResource = new HashMap<>(); + } + + @VisibleForTesting + Optional getContainerResource(final WorkerResourceSpec workerResourceSpec) { + return Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent( + Preconditions.checkNotNull(workerResourceSpec), + this::createAndMapContainerResource)); + } + + @VisibleForTesting + Set getWorkerSpecs(final Resource containerResource) { + return getEquivalentContainerResource(containerResource).stream() + .flatMap(resource -> containerResourceToWorkerSpecs.getOrDefault(resource, Collections.emptyList()).stream()) + .collect(Collectors.toSet()); + } + + @VisibleForTesting + Set getEquivalentContainerResource(final Resource containerResource) { + // Yarn might ignore the requested vcores, depending on its configurations. + // In such cases, we should also not matching vcores. + final Set equivalentContainerResources; + switch (matchingStrategy) { + case MATCH_VCORE: + equivalentContainerResources = Collections.singleton(containerResource); + break; + case IGNORE_VCORE: + default:
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409619888 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java ## @@ -53,12 +59,20 @@ requireNonNull(clazz); try { - method = clazz.getMethod("getContainersFromPreviousAttempts"); + getContainersFromPreviousAttemptsMethod = clazz.getMethod("getContainersFromPreviousAttempts"); } catch (NoSuchMethodException e) { // that happens in earlier Hadoop versions (pre 2.2) logger.info("Cannot reconnect to previously allocated containers. " + "This YARN version does not support 'getContainersFromPreviousAttempts()'"); } + + try { + getSchedulerResourceTypesMethod = clazz.getMethod("getSchedulerResourceTypes"); + } catch (NoSuchMethodException e) { + // that happens in earlier Hadoop versions (pre 2.6) + logger.info("Cannot get scheduler resource types. " + + "This YARN version does not support 'getSchedulerResourceTypes()'"); + } Review comment: One could think about factoring the lookup of methods and the logging statement out into a another method to avoid code duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409580203 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; +import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. + */ +public class WorkerSpecContainerResourceAdapter { + private final Logger log = LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class); + + private final Configuration flinkConfig; + private final int minMemMB; + private final int minVcore; + private final int maxMemMB; + private final int maxVcore; Review comment: nit: ```suggestion private final int minMemMB; private final int maxMemMB; private final int minVcore; private final int maxVcore; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409627647 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflectorTest.java ## @@ -88,7 +94,44 @@ public void testGetMethodReflectiveHadoop22() { final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector = new RegisterApplicationMasterResponseReflector(LOG); - final Method method = registerApplicationMasterResponseReflector.getMethod(); + final Method method = registerApplicationMasterResponseReflector.getGetContainersFromPreviousAttemptsMethod(); + assertThat(method, notNullValue()); + } + + @Test + public void testCallsGetSchedulerResourceTypesMethodIfPresent() { + final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector = + new RegisterApplicationMasterResponseReflector(LOG, HasMethod.class); + + final Optional> schedulerResourceTypeNames = + registerApplicationMasterResponseReflector.getSchedulerResourceTypeNamesUnsafe(new HasMethod()); Review comment: Instead of calling private methods of the `RegisterApplicationMasterResponseReflector` we could also add an `assumeTrue` statement based on the Hadoop version. Then we can have two tests for Hadoop >= 2.6 and < 2.6. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409611795 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ## @@ -243,24 +248,33 @@ protected NMClientAsync createAndStartNodeManagerClient(YarnConfiguration yarnCo // domain objects for test purposes final ResourceProfile resourceProfile1 = ResourceProfile.UNKNOWN; + final WorkerResourceSpec workerResourceSpec; + + final Resource containerResource; public String taskHost = "host1"; final TestingYarnNMClientAsync testingYarnNMClientAsync; final TestingYarnAMRMClientAsync testingYarnAMRMClientAsync; + int containerIdx = 0; + /** * Create mock RM dependencies. */ Context() throws Exception { - this(flinkConfig); + this(flinkConfig, null); } - Context(Configuration configuration) throws Exception { - final SlotManager slotManager = SlotManagerBuilder.newBuilder() - .setDefaultWorkerResourceSpec(YarnWorkerResourceSpecFactory.INSTANCE.createDefaultWorkerResourceSpec(configuration)) - .build(); + Context(Configuration configuration, @Nullable SlotManager slotManager) throws Exception { + + workerResourceSpec = YarnWorkerResourceSpecFactory.INSTANCE.createDefaultWorkerResourceSpec(configuration); + if (slotManager == null) { + slotManager = SlotManagerBuilder.newBuilder() + .setDefaultWorkerResourceSpec(workerResourceSpec) + .build(); + } Review comment: Why not moving this block into the default constructor and removing the `@Nullable` annotation from `slotManager`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409608614 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -552,38 +581,44 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { * Request new container if pending containers cannot satisfy pending slot requests. */ private void requestYarnContainerIfRequired() { - int requiredTaskManagers = getNumberRequiredWorkers(); - - while (requiredTaskManagers-- > numPendingContainerRequests) { - requestYarnContainer(); - } + getNumberRequiredWorkersPerWorkerResourceSpec().entrySet().stream() + .filter(entry -> entry.getValue() > getNumPendingWorkersFor(entry.getKey())) + .forEach(entry -> requestYarnContainer(entry.getKey())); Review comment: `forEach` is always an indicator that one could do the same with a `while` loop. In this case I would suggest to keep it simple and stupid: ```suggestion for (Map.Entry requiredWorkersPerResourceSpec : getNumberRequiredWorkersPerWorkerResourceSpec().entrySet()) { final WorkerResourceSpec workerResourceSpec = requiredWorkersPerResourceSpec.getKey(); while (requiredWorkersPerResourceSpec.getValue() > getNumPendingWorkersFor(workerResourceSpec)) { requestYarnContainer(workerResourceSpec); } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409622180 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java ## @@ -96,7 +110,40 @@ } @VisibleForTesting - Method getMethod() { - return method; + Method getGetContainersFromPreviousAttemptsMethod() { + return getContainersFromPreviousAttemptsMethod; + } + + /** +* Get names of resource types that are considered by the Yarn scheduler. +* @param response The response object from the registration at the ResourceManager. +* @return A set of resource type names, or {@link Optional#empty()} if the Yarn version does not support this API. +*/ + Optional> getSchedulerResourceTypeNames(final RegisterApplicationMasterResponse response) { + return getSchedulerResourceTypeNamesUnsafe(response); + } + + @VisibleForTesting + Optional> getSchedulerResourceTypeNamesUnsafe(final Object response) { Review comment: ```suggestion private Optional> getSchedulerResourceTypeNamesUnsafe(final Object response) { ``` and remove `@VisibleForTesting`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409576668 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.TriConsumer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; + +/** + * A Yarn {@link NMClientAsync} implementation for testing. + */ +class TestingYarnNMClientAsync extends NMClientAsyncImpl { + + private TriConsumer startContainerAsyncConsumer = (ignored1, ignored2, ignored3) -> {}; + private TriConsumer stopContainerAsyncConsumer = (ignored1, ignored2, ignored3) -> {}; + + TestingYarnNMClientAsync(final CallbackHandler callbackHandler) { + super(callbackHandler); + } + + @Override + public void startContainerAsync(Container container, ContainerLaunchContext containerLaunchContext) { + this.startContainerAsyncConsumer.accept(container, containerLaunchContext, callbackHandler); + } + + @Override + public void stopContainerAsync(ContainerId containerId, NodeId nodeId) { + this.stopContainerAsyncConsumer.accept(containerId, nodeId, callbackHandler); + } + + void setStartContainerAsyncConsumer(TriConsumer startContainerAsyncConsumer) { + this.startContainerAsyncConsumer = Preconditions.checkNotNull(startContainerAsyncConsumer); + } + + void setStopContainerAsyncConsumer(TriConsumer stopContainerAsyncConsumer) { + this.stopContainerAsyncConsumer = Preconditions.checkNotNull(stopContainerAsyncConsumer); + } Review comment: Same here with the setters and `volatile` fields. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409610652 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -552,38 +581,44 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { * Request new container if pending containers cannot satisfy pending slot requests. */ private void requestYarnContainerIfRequired() { - int requiredTaskManagers = getNumberRequiredWorkers(); - - while (requiredTaskManagers-- > numPendingContainerRequests) { - requestYarnContainer(); - } + getNumberRequiredWorkersPerWorkerResourceSpec().entrySet().stream() + .filter(entry -> entry.getValue() > getNumPendingWorkersFor(entry.getKey())) + .forEach(entry -> requestYarnContainer(entry.getKey())); Review comment: I assume that `requestYarnContainer` should never return `false` here, right? If this is the case, then let's add a `checkState` to ensure this invariant. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409589090 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptionsInternal.java ## @@ -34,4 +37,27 @@ .stringType() .noDefaultValue() .withDescription("**DO NOT USE** The location of the log config file, e.g. the path to your log4j.properties for log4j."); + + /** +* **DO NO USE** Whether {@link YarnResourceManager} should match the vcores of allocated containers with those requested. Review comment: ```suggestion * **DO NOT USE** Whether {@link YarnResourceManager} should match the vcores of allocated containers with those requested. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409630377 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -117,7 +118,9 @@ /** Client to communicate with the Node manager and launch TaskExecutor processes. */ private NMClientAsync nodeManagerClient; - private final WorkerSpecContainerResourceAdapter workerSpecContainerResourceAdapter; + private WorkerSpecContainerResourceAdapter workerSpecContainerResourceAdapter = null; Review comment: An idea to keep the `WorkerSpecContainerResourceAdapter` final is to pass in the `WorkerSpecContainerResourceAdapter.MatchingStrategy` to the method `getWorkerSpecs`. If you look at the `WorkerSpecContainerResourceAdapter` class then one also sees that the matching strategy is not really an essential part of it. Only the lookup method changes its behaviour based on it. Everything else stays the same. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r409592189 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils; +import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. + */ +public class WorkerSpecContainerResourceAdapter { + private final Logger log = LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class); + + private final Configuration flinkConfig; + private final int minMemMB; + private final int minVcore; + private final int maxMemMB; + private final int maxVcore; + private final WorkerSpecContainerResourceAdapter.MatchingStrategy matchingStrategy; + private final Map workerSpecToContainerResource; + private final Map> containerResourceToWorkerSpecs; + private final Map> containerMemoryToContainerResource; + + @VisibleForTesting + WorkerSpecContainerResourceAdapter( + final Configuration flinkConfig, + final int minMemMB, + final int minVcore, + final int maxMemMB, + final int maxVcore, + final WorkerSpecContainerResourceAdapter.MatchingStrategy matchingStrategy) { + this.flinkConfig = Preconditions.checkNotNull(flinkConfig); + this.minMemMB = minMemMB; + this.minVcore = minVcore; + this.maxMemMB = maxMemMB; + this.maxVcore = maxVcore; + this.matchingStrategy = matchingStrategy; + workerSpecToContainerResource = new HashMap<>(); + containerResourceToWorkerSpecs = new HashMap<>(); + containerMemoryToContainerResource = new HashMap<>(); + } + + @VisibleForTesting + Optional getContainerResource(final WorkerResourceSpec workerResourceSpec) { Review comment: Maybe rename into `tryComputeContainerResource` or so because this method is not simply a look up. It is rather a creation call with a caching mechanism. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r404641088 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration configuration) { //noinspection NumericCastThatLosesPrecision return cpuCoresLong; } + + /** +* Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. +*/ + @VisibleForTesting + static class WorkerSpecContainerResourceAdapter { + private final Configuration flinkConfig; + private final int minMemMB; + private final int minVcore; + private final boolean matchVcores; + private final Map workerSpecToContainerResource; + private final Map> containerResourceToWorkerSpecs; + private final Map> containerMemoryToContainerResource; + + @VisibleForTesting + WorkerSpecContainerResourceAdapter( + final Configuration flinkConfig, + final int minMemMB, + final int minVcore, + final boolean matchVcores) { + this.flinkConfig = Preconditions.checkNotNull(flinkConfig); + this.minMemMB = minMemMB; + this.minVcore = minVcore; + this.matchVcores = matchVcores; + workerSpecToContainerResource = new HashMap<>(); + containerResourceToWorkerSpecs = new HashMap<>(); + containerMemoryToContainerResource = new HashMap<>(); + } + + @VisibleForTesting + Resource getContainerResource(final WorkerResourceSpec workerResourceSpec) { + return workerSpecToContainerResource.computeIfAbsent( + Preconditions.checkNotNull(workerResourceSpec), + this::createAndMapContainerResource); + } + + @VisibleForTesting + Collection getWorkerSpecs(final Resource containerResource) { + return getEquivalentContainerResource(containerResource).stream() + .flatMap(resource -> containerResourceToWorkerSpecs.getOrDefault(resource, Collections.emptyList()).stream()) + .collect(Collectors.toList()); + } + + @VisibleForTesting + Collection getEquivalentContainerResource(final Resource containerResource) { + // Yarn might ignore the requested vcores, depending on its configurations. + // In such cases, we should also not matching vcores. + return matchVcores ? + Collections.singletonList(containerResource) : + containerMemoryToContainerResource.getOrDefault(containerResource.getMemory(), Collections.emptyList()); + } + + private Resource createAndMapContainerResource(final WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated Review comment: The current SM will be initialized with the proper default `WorkerResourceSpec` derived from Flink's configuration, right? Hence, we could already resolve this TODO, right? Then there would not be any problems with enabling support for dynamic worker resources. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r404639691 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.increaseAndGet(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated Review comment: Ok, instead of creating a TODO, I would suggest to create a JIRA ticket which is linked as a follow up. TODO's tend to be forgotten. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r404634322 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A Yarn {@link AMRMClientAsync} implementation for testing. + */ +public class TestingYarnAMRMClientAsync extends AMRMClientAsyncImpl { Review comment: I think this is the price we have to pay for integrating with another system. It is always painful and tedious to set up the tests. In parts, this always involves reimplementing the other systems behaviour in mocks. I don't see a better atm. Hence, +1 for what you've proposed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r404160153 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -412,30 +439,32 @@ private void releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId final ResourceID resourceId = new ResourceID(containerId.toString()); // release the failed container - workerNodeMap.remove(resourceId); + YarnWorkerNode yarnWorkerNode = workerNodeMap.remove(resourceId); resourceManagerClient.releaseAssignedContainer(containerId); // and ask for a new one - requestYarnContainerIfRequired(); + requestYarnContainerIfRequired(yarnWorkerNode.getContainer().getResource()); } private void returnExcessContainer(Container excessContainer) { log.info("Returning excess container {}.", excessContainer.getId()); resourceManagerClient.releaseAssignedContainer(excessContainer.getId()); } - private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest) { - numPendingContainerRequests--; - - log.info("Removing container request {}. Pending container requests {}.", pendingContainerRequest, numPendingContainerRequests); - + private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest, WorkerResourceSpec workerResourceSpec) { + log.info("Removing container request {}.", pendingContainerRequest); + pendingWorkerCounter.decreaseAndGet(workerResourceSpec); Review comment: Not saying that we have to implement it right away. I just want to know how one could fix this as a follow-up task. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r404146599 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -540,39 +571,41 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { /** * Request new container if pending containers cannot satisfy pending slot requests. */ - private void requestYarnContainerIfRequired() { - int requiredTaskManagers = getNumberRequiredTaskManagers(); - - if (requiredTaskManagers > numPendingContainerRequests) { - requestYarnContainer(); - } + private void requestYarnContainerIfRequired(Resource containerResource) { + getPendingWorkerNums().entrySet().stream() Review comment: maybe we could rename `getPendingWorkerNums` into `getRequiredWorkers` or `getRequiredResources`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r404144900 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -412,30 +439,32 @@ private void releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId final ResourceID resourceId = new ResourceID(containerId.toString()); // release the failed container - workerNodeMap.remove(resourceId); + YarnWorkerNode yarnWorkerNode = workerNodeMap.remove(resourceId); resourceManagerClient.releaseAssignedContainer(containerId); // and ask for a new one - requestYarnContainerIfRequired(); + requestYarnContainerIfRequired(yarnWorkerNode.getContainer().getResource()); } private void returnExcessContainer(Container excessContainer) { log.info("Returning excess container {}.", excessContainer.getId()); resourceManagerClient.releaseAssignedContainer(excessContainer.getId()); } - private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest) { - numPendingContainerRequests--; - - log.info("Removing container request {}. Pending container requests {}.", pendingContainerRequest, numPendingContainerRequests); - + private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest, WorkerResourceSpec workerResourceSpec) { + log.info("Removing container request {}.", pendingContainerRequest); + pendingWorkerCounter.decreaseAndGet(workerResourceSpec); Review comment: Hmm, would it be possible to have a periodic cleanup task? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r404142447 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A Yarn {@link AMRMClientAsync} implementation for testing. + */ +public class TestingYarnAMRMClientAsync extends AMRMClientAsyncImpl { Review comment: I think my problem with this class is that we are partially overriding an implementation class here. We don't really know how the overridden methods are used internally. Looking at the code, it looks as if they are all forwarding the calls to `client`. From this perspective it might look fine but if you take a look at `getAvailableResources`, which has not been overridden, then it also calls `client`. Do we know that there is no contract between the overridden methods and `getAvailableResources`? What if the `client` returns different available resources depending on how many container requests have been added? I'm not saying that this is the case, but I want to make that overriding individual methods of an implementation class can cause failures which are hard to predict and even harder to debug. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r404134516 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingContainer.java ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; + +/** + * A {@link Container} implementation for testing. + */ +class TestingContainer extends Container { Review comment: Using `ContainerPBImpl` should work. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r404128962 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptionsInternal.java ## @@ -34,4 +37,24 @@ .stringType() .noDefaultValue() .withDescription("**DO NOT USE** The location of the log config file, e.g. the path to your log4j.properties for log4j."); + + /** +* **DO NO USE** Whether {@link YarnResourceManager} should match the vcores of allocated containers with those requested. +* +* By default, Yarn ignores vcores in the container requests, and always allocate 1 vcore for each container. +* Iff 'yarn.scheduler.capacity.resource-calculator' is set to 'DominantResourceCalculator' for Yarn, will it +* allocate container vcores as requested. Unfortunately, this configuration option is dedicated for Yarn Scheduler, +* and is only accessible to applications in Hadoop 2.6+. +* +* ATM, it should be fine to not match vcores, because with the current {@link SlotManagerImpl} all the TM +* containers should have the same resources. +* +* If later we add another {@link SlotManager} implementation that may have TMs with different resources, we can +* switch this option on only for the new SM, and the new SM can also be available on Hadoop 2.6+ only. +*/ + public static final ConfigOption MATCH_CONTAINER_VCORES = + key("$internal.yarn.resourcemanager.enable-vcore-matching") + .booleanType() + .defaultValue(false) + .withDescription("**DO NOT USE** Whether YarnResourceManager should match the container vcores."); Review comment: Ok, thanks for the clarification. Automatically configuring this flag with Hadoop >= 2.6 sounds like a good idea. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r404124170 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration configuration) { //noinspection NumericCastThatLosesPrecision return cpuCoresLong; } + + /** +* Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. +*/ + @VisibleForTesting + static class WorkerSpecContainerResourceAdapter { + private final Configuration flinkConfig; + private final int minMemMB; + private final int minVcore; + private final boolean matchVcores; + private final Map workerSpecToContainerResource; + private final Map> containerResourceToWorkerSpecs; + private final Map> containerMemoryToContainerResource; + + @VisibleForTesting + WorkerSpecContainerResourceAdapter( + final Configuration flinkConfig, + final int minMemMB, + final int minVcore, + final boolean matchVcores) { + this.flinkConfig = Preconditions.checkNotNull(flinkConfig); + this.minMemMB = minMemMB; + this.minVcore = minVcore; + this.matchVcores = matchVcores; + workerSpecToContainerResource = new HashMap<>(); + containerResourceToWorkerSpecs = new HashMap<>(); + containerMemoryToContainerResource = new HashMap<>(); + } + + @VisibleForTesting + Resource getContainerResource(final WorkerResourceSpec workerResourceSpec) { + return workerSpecToContainerResource.computeIfAbsent( + Preconditions.checkNotNull(workerResourceSpec), + this::createAndMapContainerResource); + } + + @VisibleForTesting + Collection getWorkerSpecs(final Resource containerResource) { + return getEquivalentContainerResource(containerResource).stream() + .flatMap(resource -> containerResourceToWorkerSpecs.getOrDefault(resource, Collections.emptyList()).stream()) + .collect(Collectors.toList()); + } + + @VisibleForTesting + Collection getEquivalentContainerResource(final Resource containerResource) { + // Yarn might ignore the requested vcores, depending on its configurations. + // In such cases, we should also not matching vcores. + return matchVcores ? + Collections.singletonList(containerResource) : + containerMemoryToContainerResource.getOrDefault(containerResource.getMemory(), Collections.emptyList()); + } + + private Resource createAndMapContainerResource(final WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated Review comment: In general I agree unless it is not yet fully supported by the RM implementation. If it is, then I guess we also don't need this comment. If it is not supported, then the check state will help us to remember what needs to be changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r404123445 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -321,6 +339,47 @@ public void testGetCpuCoresNumSlots() { assertThat(resourceManager.getCpuCores(configuration), is(3.0)); } + @Test + public void testStartAndRecoverVariousResourceSpec() { + // Start two workers with different resources + final WorkerResourceSpec workerResourceSpec1 = new WorkerResourceSpec(1.0, 100, 0, 100, 100); + final WorkerResourceSpec workerResourceSpec2 = new WorkerResourceSpec(1.0, 99, 0, 100, 100); + resourceManager.startNewWorker(workerResourceSpec1); + resourceManager.startNewWorker(workerResourceSpec2); + + // Verify two pods with both worker resources are started + final PodList initialPodList = kubeClient.pods().list(); + assertEquals(2, initialPodList.getItems().size()); + final Pod initialPod1 = getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20)); + final Pod initialPod2 = getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20)); + + // Notify resource manager about pods added. + final KubernetesPod initialKubernetesPod1 = new KubernetesPod(initialPod1); + final KubernetesPod initialKubernetesPod2 = new KubernetesPod(initialPod2); + resourceManager.onAdded(ImmutableList.of(initialKubernetesPod1, initialKubernetesPod2)); + + // Terminate pod1. + terminatePod(initialPod1); + resourceManager.setCustomPendingWorkerNums(Collections.singletonMap(workerResourceSpec1, 1)); Review comment: This sounds good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r404121234 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.increaseAndGet(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated Review comment: So do we have the requirement that what's written in the config and what's specified in `workerResourceSpec` have to be the same? If yes, then I think we should add a check state here. This will ensure that we remember what needs to be adjusted in order to support dynamic worker resources. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403089669 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.increaseAndGet(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int requiredTaskManagers = getPendingWorkerNums().get(workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.getNum(workerResourceSpec); Review comment: I think I would hide `pendingWorkerCounter` behind some methods which the base class provides. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403128803 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -540,39 +571,41 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { /** * Request new container if pending containers cannot satisfy pending slot requests. */ - private void requestYarnContainerIfRequired() { - int requiredTaskManagers = getNumberRequiredTaskManagers(); - - if (requiredTaskManagers > numPendingContainerRequests) { - requestYarnContainer(); - } + private void requestYarnContainerIfRequired(Resource containerResource) { + getPendingWorkerNums().entrySet().stream() + .filter(entry -> + getContainerResource(entry.getKey()).equals(containerResource) && + entry.getValue() > pendingWorkerCounter.getNum(entry.getKey())) + .findAny() Review comment: Shouldn't we do this for all instead of any? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403115740 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingContainerStatus.java ## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; + +/** + * A {@link ContainerStatus} implementation for testing. + */ +class TestingContainerStatus extends ContainerStatus { + + private final ContainerId containerId; + private final ContainerState containerState; + private final String diagnostics; + private final int exitStatus; + + TestingContainerStatus( + final ContainerId containerId, + final ContainerState containerState, + final String diagnostics, + final int exitStatus) { + + this.containerId = containerId; + this.containerState = containerState; + this.diagnostics = diagnostics; + this.exitStatus = exitStatus; + } + + @Override + public ContainerId getContainerId() { + return containerId; + } + + @Override + public void setContainerId(ContainerId containerId) { + Review comment: Shouldn't we fail in case someone calls `setContainerId` here? Otherwise it might go unnoticed and result in some strange behaviour/failure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403121941 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; + +import java.util.function.Consumer; + +/** + * A Yarn {@link NMClientAsync} implementation for testing. + */ +class TestingYarnNMClientAsync extends NMClientAsyncImpl { + + private Consumer> startContainerAsyncConsumer = ignored -> {}; + private Consumer> stopContainerAsyncConsumer = ignored -> {}; Review comment: I'd suggest to use the `TriConsumer` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403132471 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -540,39 +571,41 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { /** * Request new container if pending containers cannot satisfy pending slot requests. */ - private void requestYarnContainerIfRequired() { - int requiredTaskManagers = getNumberRequiredTaskManagers(); - - if (requiredTaskManagers > numPendingContainerRequests) { - requestYarnContainer(); - } + private void requestYarnContainerIfRequired(Resource containerResource) { Review comment: Shouldn't we iterate over all resources which are needed instead of restricting it to `containerResource`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403133265 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactoryTest.java ## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.entrypoint; + +import org.apache.flink.api.common.resources.CPUResource; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.yarn.configuration.YarnConfigOptions; + +import org.junit.Test; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link YarnResourceManagerFactory}. + */ +public class YarnResourceManagerFactoryTest { Review comment: ```suggestion public class YarnResourceManagerFactoryTest extends TestLogger { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403090988 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.increaseAndGet(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated + final TaskExecutorProcessSpec taskExecutorProcessSpec = + TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec); final String podName = String.format( TASK_MANAGER_POD_FORMAT, clusterId, currentMaxAttemptId, ++currentMaxPodId); + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); + final String dynamicProperties = BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig); - final KubernetesTaskManagerParameters kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters( + return new KubernetesTaskManagerParameters( flinkConfig, podName, dynamicProperties, taskManagerParameters); - - final KubernetesPod taskManagerPod = - KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters); - - log.info("TaskManager {} will be started with {}.", podName, taskExecutorProcessSpec); - kubeClient.createTaskManagerPod(taskManagerPod); } /** * Request new pod if pending pods cannot satisfy pending slot requests. */ - private void requestKubernetesPodIfRequired() { - final int requiredTaskManagers = getNumberRequiredTaskManagers(); + private void requestKubernetesPodIfRequired(WorkerResourceSpec workerResourceSpec) { + final int requiredTaskManagers = getPendingWorkerNums().get(workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.getNum(workerResourceSpec); - if (requiredTaskManagers > numPendingPodRequests) { - requestKubernetesPod(); + if (requiredTaskManagers > pendingWorkerNum) { + requestKubernetesPod(workerResourceSpec); } } private void removePodIfTerminated(KubernetesPod pod) { if (pod.isTerminated()) { kubeClient.stopPod(pod.getName()); - final KubernetesWorkerNode kubernetesWorkerNode = workerNodes.remove(new ResourceID(pod.getName())); - if (kubernetesWorkerNode != null) { - requestKubernetesPodIfRequired(); + final WorkerResourceSpec workerResourceSpec = removeWorkerNodeAndResourceSpec(new ResourceID(pod.getName())); + if (workerResourceSpec != null) { + requestKubernetesPodIfRequired(workerResourceSpec);
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403118223 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A Yarn {@link AMRMClientAsync} implementation for testing. + */ +public class TestingYarnAMRMClientAsync extends AMRMClientAsyncImpl { Review comment: `AMRMClientAsyncImpl` is annotated as unstable. I'm not sure how will the testing implementation works across different Yarn versions. Have we tried this out? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403105830 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration configuration) { //noinspection NumericCastThatLosesPrecision return cpuCoresLong; } + + /** +* Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. +*/ + @VisibleForTesting + static class WorkerSpecContainerResourceAdapter { + private final Configuration flinkConfig; + private final int minMemMB; + private final int minVcore; + private final boolean matchVcores; + private final Map workerSpecToContainerResource; + private final Map> containerResourceToWorkerSpecs; Review comment: If we are interested in every `WorkerResourceSpec` which ever resulted into a given `Resource`, then I would suggest to change the value type to `List`. Otherwise one could instantiate this field with a `Set` which has different semantics. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403113398 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingContainer.java ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; + +/** + * A {@link Container} implementation for testing. + */ +class TestingContainer extends Container { Review comment: I like the idea of getting rid of Mockito but how do we ensure that this works with all Hadoop versions? Looking at Hadoop 2.10. https://github.com/apache/hadoop/blob/release-2.10.0-RC1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java it looks as if the container has gotten some more methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403088133 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.increaseAndGet(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated Review comment: Should we add a check state to ensure that we fail in case that we request a different size? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403078672 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerTest.java ## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +/** + * Tests for {@link ActiveResourceManager}. + */ +public class ActiveResourceManagerTest { Review comment: ```suggestion public class ActiveResourceManagerTest extends TestLogger { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403126015 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -363,31 +356,64 @@ public void onContainersCompleted(final List statuses) { @Override public void onContainersAllocated(List containers) { runAsync(() -> { - log.info("Received {} containers with {} pending container requests.", containers.size(), numPendingContainerRequests); - final Collection pendingRequests = getPendingRequests(); - final Iterator pendingRequestsIterator = pendingRequests.iterator(); + log.info("Received {} containers.", containers.size()); - // number of allocated containers can be larger than the number of pending container requests - final int numAcceptedContainers = Math.min(containers.size(), numPendingContainerRequests); - final List requiredContainers = containers.subList(0, numAcceptedContainers); - final List excessContainers = containers.subList(numAcceptedContainers, containers.size()); - - for (int i = 0; i < requiredContainers.size(); i++) { - removeContainerRequest(pendingRequestsIterator.next()); - } - - excessContainers.forEach(this::returnExcessContainer); - requiredContainers.forEach(this::startTaskExecutorInContainer); + groupContainerByResource(containers).forEach(this::onContainersOfResourceAllocated); // if we are waiting for no further containers, we can go to the // regular heartbeat interval - if (numPendingContainerRequests <= 0) { + if (pendingWorkerCounter.getTotalNum() <= 0) { resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis); } }); } - private void startTaskExecutorInContainer(Container container) { + private Map> groupContainerByResource(List containers) { + return containers.stream().collect(Collectors.groupingBy(Container::getResource)); + } + + private void onContainersOfResourceAllocated(Resource resource, List containers) { + final List pendingWorkerResourceSpecs = + workerSpecContainerResourceAdapter.getWorkerSpecs(resource).stream() Review comment: Can it happen that `getWorkerSpecs(resource)` returns a list which contains a `WorkerResourceSpec` twice? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403099591 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration configuration) { //noinspection NumericCastThatLosesPrecision return cpuCoresLong; } + + /** +* Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. +*/ + @VisibleForTesting + static class WorkerSpecContainerResourceAdapter { + private final Configuration flinkConfig; + private final int minMemMB; + private final int minVcore; + private final boolean matchVcores; + private final Map workerSpecToContainerResource; + private final Map> containerResourceToWorkerSpecs; + private final Map> containerMemoryToContainerResource; + + @VisibleForTesting + WorkerSpecContainerResourceAdapter( + final Configuration flinkConfig, + final int minMemMB, + final int minVcore, + final boolean matchVcores) { + this.flinkConfig = Preconditions.checkNotNull(flinkConfig); + this.minMemMB = minMemMB; + this.minVcore = minVcore; + this.matchVcores = matchVcores; + workerSpecToContainerResource = new HashMap<>(); + containerResourceToWorkerSpecs = new HashMap<>(); + containerMemoryToContainerResource = new HashMap<>(); + } + + @VisibleForTesting + Resource getContainerResource(final WorkerResourceSpec workerResourceSpec) { + return workerSpecToContainerResource.computeIfAbsent( + Preconditions.checkNotNull(workerResourceSpec), + this::createAndMapContainerResource); + } + + @VisibleForTesting + Collection getWorkerSpecs(final Resource containerResource) { + return getEquivalentContainerResource(containerResource).stream() + .flatMap(resource -> containerResourceToWorkerSpecs.getOrDefault(resource, Collections.emptyList()).stream()) + .collect(Collectors.toList()); + } + + @VisibleForTesting + Collection getEquivalentContainerResource(final Resource containerResource) { + // Yarn might ignore the requested vcores, depending on its configurations. + // In such cases, we should also not matching vcores. + return matchVcores ? + Collections.singletonList(containerResource) : + containerMemoryToContainerResource.getOrDefault(containerResource.getMemory(), Collections.emptyList()); + } + + private Resource createAndMapContainerResource(final WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated Review comment: I would suggest to add a check state to ensure that we fail once we enable dynamic worker resources. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403130352 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -540,39 +571,41 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) { /** * Request new container if pending containers cannot satisfy pending slot requests. */ - private void requestYarnContainerIfRequired() { - int requiredTaskManagers = getNumberRequiredTaskManagers(); - - if (requiredTaskManagers > numPendingContainerRequests) { - requestYarnContainer(); - } + private void requestYarnContainerIfRequired(Resource containerResource) { + getPendingWorkerNums().entrySet().stream() Review comment: I have to admit that I find `getPendingWorkerNums()` and `pendingWorkerCounter` quite confusing. The sound almost the same but the former means the requirements of the `SlotManager` and the latter the currently pending workers which have been requested by the RM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403104873 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration configuration) { //noinspection NumericCastThatLosesPrecision return cpuCoresLong; } + + /** +* Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. +*/ + @VisibleForTesting + static class WorkerSpecContainerResourceAdapter { + private final Configuration flinkConfig; + private final int minMemMB; + private final int minVcore; + private final boolean matchVcores; + private final Map workerSpecToContainerResource; + private final Map> containerResourceToWorkerSpecs; + private final Map> containerMemoryToContainerResource; + + @VisibleForTesting + WorkerSpecContainerResourceAdapter( + final Configuration flinkConfig, + final int minMemMB, + final int minVcore, + final boolean matchVcores) { + this.flinkConfig = Preconditions.checkNotNull(flinkConfig); + this.minMemMB = minMemMB; + this.minVcore = minVcore; + this.matchVcores = matchVcores; + workerSpecToContainerResource = new HashMap<>(); + containerResourceToWorkerSpecs = new HashMap<>(); + containerMemoryToContainerResource = new HashMap<>(); + } + + @VisibleForTesting + Resource getContainerResource(final WorkerResourceSpec workerResourceSpec) { + return workerSpecToContainerResource.computeIfAbsent( + Preconditions.checkNotNull(workerResourceSpec), + this::createAndMapContainerResource); + } + + @VisibleForTesting + Collection getWorkerSpecs(final Resource containerResource) { + return getEquivalentContainerResource(containerResource).stream() + .flatMap(resource -> containerResourceToWorkerSpecs.getOrDefault(resource, Collections.emptyList()).stream()) + .collect(Collectors.toList()); + } + + @VisibleForTesting + Collection getEquivalentContainerResource(final Resource containerResource) { Review comment: If we are only interested in the equivalence class, then I would suggest to change the type from `Collection` to `Set`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403086625 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -183,16 +178,18 @@ public boolean stopWorker(final KubernetesWorkerNode worker) { @Override public void onAdded(List pods) { runAsync(() -> { - for (KubernetesPod pod : pods) { - if (numPendingPodRequests > 0) { - numPendingPodRequests--; + pods.forEach(pod -> { + WorkerResourceSpec workerResourceSpec = podWorkerResources.get(pod.getName()); + final int pendingNum = pendingWorkerCounter.getNum(workerResourceSpec); + if (pendingNum > 0) { + pendingWorkerCounter.decreaseAndGet(workerResourceSpec); Review comment: I'm wondering whether this logic shouldn't go into the `ActiveResourceManager`. I would expect that all `ActiveResourceManager` implementations would need to do something similar. Maybe we could introduce `notifyNewWorkerStarted(WorkerResourceSpec)`. This could also have the benefit that we could hide `pendingWorkerCounter` completely. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403097275 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration configuration) { //noinspection NumericCastThatLosesPrecision return cpuCoresLong; } + + /** +* Utility class for converting between Flink {@link WorkerResourceSpec} and Yarn {@link Resource}. +*/ + @VisibleForTesting + static class WorkerSpecContainerResourceAdapter { Review comment: I think this class is large enough to warrant its own file. This would also decrease the size of this source code file a bit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403131968 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -412,30 +439,32 @@ private void releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId final ResourceID resourceId = new ResourceID(containerId.toString()); // release the failed container - workerNodeMap.remove(resourceId); + YarnWorkerNode yarnWorkerNode = workerNodeMap.remove(resourceId); resourceManagerClient.releaseAssignedContainer(containerId); // and ask for a new one - requestYarnContainerIfRequired(); + requestYarnContainerIfRequired(yarnWorkerNode.getContainer().getResource()); } private void returnExcessContainer(Container excessContainer) { log.info("Returning excess container {}.", excessContainer.getId()); resourceManagerClient.releaseAssignedContainer(excessContainer.getId()); } - private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest) { - numPendingContainerRequests--; - - log.info("Removing container request {}. Pending container requests {}.", pendingContainerRequest, numPendingContainerRequests); - + private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest, WorkerResourceSpec workerResourceSpec) { + log.info("Removing container request {}.", pendingContainerRequest); + pendingWorkerCounter.decreaseAndGet(workerResourceSpec); Review comment: Differently asked, when do we clean `workerSpecContainerResourceAdapter` up? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403092773 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -176,6 +185,15 @@ MainThreadExecutor getMainThreadExecutorForTesting() { SlotManager getSlotManager() { return this.slotManager; } + + @Override + public Map getPendingWorkerNums() { + return customPendingWorkerNums != null ? customPendingWorkerNums : super.getPendingWorkerNums(); + } + + public void setCustomPendingWorkerNums(final Map customPendingWorkerNums) { + this.customPendingWorkerNums = customPendingWorkerNums; + } Review comment: I think this is pretty much whitebox testing as it strongly relies on internal implementation details. I would recommend to go another way and to rely either on the public APIs of the component or to encapsulate the bookkeeping logic so that it can be tested separately. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403123867 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -363,31 +356,64 @@ public void onContainersCompleted(final List statuses) { @Override public void onContainersAllocated(List containers) { runAsync(() -> { - log.info("Received {} containers with {} pending container requests.", containers.size(), numPendingContainerRequests); - final Collection pendingRequests = getPendingRequests(); - final Iterator pendingRequestsIterator = pendingRequests.iterator(); + log.info("Received {} containers.", containers.size()); - // number of allocated containers can be larger than the number of pending container requests - final int numAcceptedContainers = Math.min(containers.size(), numPendingContainerRequests); - final List requiredContainers = containers.subList(0, numAcceptedContainers); - final List excessContainers = containers.subList(numAcceptedContainers, containers.size()); - - for (int i = 0; i < requiredContainers.size(); i++) { - removeContainerRequest(pendingRequestsIterator.next()); - } - - excessContainers.forEach(this::returnExcessContainer); - requiredContainers.forEach(this::startTaskExecutorInContainer); + groupContainerByResource(containers).forEach(this::onContainersOfResourceAllocated); Review comment: I'd suggest to use the for-each loop. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403088473 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() throws ResourceManagerExce ++currentMaxAttemptId); } - private void requestKubernetesPod() { - numPendingPodRequests++; + private void requestKubernetesPod(WorkerResourceSpec workerResourceSpec) { + final KubernetesTaskManagerParameters parameters = + createKubernetesTaskManagerParameters(workerResourceSpec); + + podWorkerResources.put(parameters.getPodName(), workerResourceSpec); + final int pendingWorkerNum = pendingWorkerCounter.increaseAndGet(workerResourceSpec); log.info("Requesting new TaskManager pod with <{},{}>. Number pending requests {}.", - defaultMemoryMB, - defaultCpus, - numPendingPodRequests); + parameters.getTaskManagerMemoryMB(), + parameters.getTaskManagerCPU(), + pendingWorkerNum); + log.info("TaskManager {} will be started with {}.", parameters.getPodName(), workerResourceSpec); + + final KubernetesPod taskManagerPod = + KubernetesTaskManagerFactory.createTaskManagerComponent(parameters); + kubeClient.createTaskManagerPod(taskManagerPod); + } + + private KubernetesTaskManagerParameters createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) { + // TODO: need to unset process/flink memory size from configuration if dynamic worker resource is activated Review comment: Btw: where do we change the `Configuration`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403127546 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -412,30 +439,32 @@ private void releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId final ResourceID resourceId = new ResourceID(containerId.toString()); // release the failed container - workerNodeMap.remove(resourceId); + YarnWorkerNode yarnWorkerNode = workerNodeMap.remove(resourceId); resourceManagerClient.releaseAssignedContainer(containerId); // and ask for a new one - requestYarnContainerIfRequired(); + requestYarnContainerIfRequired(yarnWorkerNode.getContainer().getResource()); } private void returnExcessContainer(Container excessContainer) { log.info("Returning excess container {}.", excessContainer.getId()); resourceManagerClient.releaseAssignedContainer(excessContainer.getId()); } - private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest) { - numPendingContainerRequests--; - - log.info("Removing container request {}. Pending container requests {}.", pendingContainerRequest, numPendingContainerRequests); - + private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest, WorkerResourceSpec workerResourceSpec) { + log.info("Removing container request {}.", pendingContainerRequest); + pendingWorkerCounter.decreaseAndGet(workerResourceSpec); Review comment: What about removing it from `workerSpecContainerResourceAdapter`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403107046 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptionsInternal.java ## @@ -34,4 +37,24 @@ .stringType() .noDefaultValue() .withDescription("**DO NOT USE** The location of the log config file, e.g. the path to your log4j.properties for log4j."); + + /** +* **DO NO USE** Whether {@link YarnResourceManager} should match the vcores of allocated containers with those requested. +* +* By default, Yarn ignores vcores in the container requests, and always allocate 1 vcore for each container. +* Iff 'yarn.scheduler.capacity.resource-calculator' is set to 'DominantResourceCalculator' for Yarn, will it +* allocate container vcores as requested. Unfortunately, this configuration option is dedicated for Yarn Scheduler, +* and is only accessible to applications in Hadoop 2.6+. +* +* ATM, it should be fine to not match vcores, because with the current {@link SlotManagerImpl} all the TM +* containers should have the same resources. +* +* If later we add another {@link SlotManager} implementation that may have TMs with different resources, we can +* switch this option on only for the new SM, and the new SM can also be available on Hadoop 2.6+ only. +*/ + public static final ConfigOption MATCH_CONTAINER_VCORES = + key("$internal.yarn.resourcemanager.enable-vcore-matching") + .booleanType() + .defaultValue(false) + .withDescription("**DO NOT USE** Whether YarnResourceManager should match the container vcores."); Review comment: Does this mean that one has to configure ones Flink cluster depending on the configuration of the Yarn cluster? What happens if one forgets about Flink? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403108543 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ## @@ -584,6 +590,85 @@ public void testGetCpuExceedMaxInt() throws Exception { }}; } + @Test + public void testWorkerSpecContainerResourceAdapter_MatchVcores() { + final int minMemMB = 100; + final int minVcore = 10; + final YarnResourceManager.WorkerSpecContainerResourceAdapter adapter = + new YarnResourceManager.WorkerSpecContainerResourceAdapter( + getConfigProcessSpecEqualsWorkerSpec(), minMemMB, minVcore, true); Review comment: It is usually easier to understand if one use an enum instead of boolean because one can give the different values expressive names (e.g. `MATCH_VCORES`, `IGNORE_VCORES`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403118602 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A Yarn {@link AMRMClientAsync} implementation for testing. + */ +public class TestingYarnAMRMClientAsync extends AMRMClientAsyncImpl { + + private Function, List>> + getMatchingRequestsFunction = ignored -> Collections.emptyList(); + private Consumer> addContainerRequestConsumer = ignored -> {}; + private Consumer> removeContainerRequestConsumer = ignored -> {}; + private Consumer> releaseAssignedContainerConsumer = ignored -> {}; Review comment: I'd suggest to use the `BiConsumer` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403132911 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactoryTest.java ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.entrypoint; + +import org.apache.flink.api.common.resources.CPUResource; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; + +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link KubernetesResourceManagerFactory}. + */ +public class KubernetesResourceManagerFactoryTest { Review comment: ```suggestion public class KubernetesResourceManagerFactoryTest extends TestLogger { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403093997 ## File path: flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java ## @@ -321,6 +339,47 @@ public void testGetCpuCoresNumSlots() { assertThat(resourceManager.getCpuCores(configuration), is(3.0)); } + @Test + public void testStartAndRecoverVariousResourceSpec() { + // Start two workers with different resources + final WorkerResourceSpec workerResourceSpec1 = new WorkerResourceSpec(1.0, 100, 0, 100, 100); + final WorkerResourceSpec workerResourceSpec2 = new WorkerResourceSpec(1.0, 99, 0, 100, 100); + resourceManager.startNewWorker(workerResourceSpec1); + resourceManager.startNewWorker(workerResourceSpec2); + + // Verify two pods with both worker resources are started + final PodList initialPodList = kubeClient.pods().list(); + assertEquals(2, initialPodList.getItems().size()); + final Pod initialPod1 = getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20)); + final Pod initialPod2 = getPodContainsStrInArgs(initialPodList, TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20)); + + // Notify resource manager about pods added. + final KubernetesPod initialKubernetesPod1 = new KubernetesPod(initialPod1); + final KubernetesPod initialKubernetesPod2 = new KubernetesPod(initialPod2); + resourceManager.onAdded(ImmutableList.of(initialKubernetesPod1, initialKubernetesPod2)); + + // Terminate pod1. + terminatePod(initialPod1); + resourceManager.setCustomPendingWorkerNums(Collections.singletonMap(workerResourceSpec1, 1)); Review comment: I would recommend to not test the component like this. It requires detailed knowledge of the component's internals and makes it harder to evolve it because this test relies on the fact that the `KubernetesResourceManager` has a map of `WorkerResourceSpec` to `Integers`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403083789 ## File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java ## @@ -183,16 +178,18 @@ public boolean stopWorker(final KubernetesWorkerNode worker) { @Override public void onAdded(List pods) { runAsync(() -> { - for (KubernetesPod pod : pods) { - if (numPendingPodRequests > 0) { - numPendingPodRequests--; + pods.forEach(pod -> { Review comment: Call me old fashioned, but I think the for-each loop `for (KubernetesPod pod: pods)` is superior to `forEach`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager URL: https://github.com/apache/flink/pull/11353#discussion_r403121671 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.util.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; + +import java.util.function.Consumer; + +/** + * A Yarn {@link NMClientAsync} implementation for testing. + */ +class TestingYarnNMClientAsync extends NMClientAsyncImpl { Review comment: Same here `NMClientAsyncImpl` seems to be unstable and might change depending on the used Yarn version. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services