[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-20 Thread GitBox


tillrohrmann commented on a change in pull request #11353:
URL: https://github.com/apache/flink/pull/11353#discussion_r411422637



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
##
@@ -96,7 +110,40 @@
}
 
@VisibleForTesting
-   Method getMethod() {
-   return method;
+   Method getGetContainersFromPreviousAttemptsMethod() {
+   return getContainersFromPreviousAttemptsMethod;
+   }
+
+   /**
+* Get names of resource types that are considered by the Yarn 
scheduler.
+* @param response The response object from the registration at the 
ResourceManager.
+* @return A set of resource type names, or {@link Optional#empty()} if 
the Yarn version does not support this API.
+*/
+   Optional> getSchedulerResourceTypeNames(final 
RegisterApplicationMasterResponse response) {
+   return getSchedulerResourceTypeNamesUnsafe(response);
+   }
+
+   @VisibleForTesting
+   Optional> getSchedulerResourceTypeNamesUnsafe(final Object 
response) {

Review comment:
   Won't do because of testing purposes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-20 Thread GitBox


tillrohrmann commented on a change in pull request #11353:
URL: https://github.com/apache/flink/pull/11353#discussion_r411421781



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.TriConsumer;
+import org.apache.flink.util.function.TriFunction;
+
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * A Yarn {@link AMRMClientAsync} implementation for testing.
+ */
+public class TestingYarnAMRMClientAsync extends 
AMRMClientAsyncImpl {
+
+   private Function, 
List>>
+   getMatchingRequestsFunction = ignored -> 
Collections.emptyList();
+   private BiConsumer 
addContainerRequestConsumer = (ignored1, ignored2) -> {};
+   private BiConsumer 
removeContainerRequestConsumer = (ignored1, ignored2) -> {};
+   private BiConsumer 
releaseAssignedContainerConsumer = (ignored1, ignored2) -> {};
+   private Consumer setHeartbeatIntervalConsumer = (ignored) -> 
{};
+   private TriFunction registerApplicationMasterFunction =
+   (ignored1, ignored2, ignored3) -> 
RegisterApplicationMasterResponse.newInstance(
+   Resource.newInstance(0, 0),
+   Resource.newInstance(Integer.MAX_VALUE, 
Integer.MAX_VALUE),
+   Collections.emptyMap(),
+   null,
+   Collections.emptyList(),
+   null,
+   Collections.emptyList());
+   private TriConsumer 
unregisterApplicationMasterConsumer = (ignored1, ignored2, ignored3) -> {};
+
+   TestingYarnAMRMClientAsync(CallbackHandler callbackHandler) {
+   super(0, callbackHandler);
+   }
+
+   @Override
+   public List> 
getMatchingRequests(Priority priority, String resourceName, Resource 
capability) {
+   return getMatchingRequestsFunction.apply(Tuple4.of(priority, 
resourceName, capability, handler));
+   }
+
+   @Override
+   public void addContainerRequest(AMRMClient.ContainerRequest req) {
+   addContainerRequestConsumer.accept(req, handler);
+   }
+
+   @Override
+   public void removeContainerRequest(AMRMClient.ContainerRequest req) {
+   removeContainerRequestConsumer.accept(req, handler);
+   }
+
+   @Override
+   public void releaseAssignedContainer(ContainerId containerId) {
+   releaseAssignedContainerConsumer.accept(containerId, handler);
+   }
+
+   @Override
+   public void setHeartbeatInterval(int interval) {
+   setHeartbeatIntervalConsumer.accept(interval);
+   }
+
+   @Override
+   public RegisterApplicationMasterResponse 
registerApplicationMaster(String appHostName, int appHostPort, String 
appTrackingUrl) {
+   return registerApplicationMasterFunction.apply(appHostName, 
appHostPort, appTrackingUrl);
+   }
+
+   @Override
+   public void unregisterApplicationMaster(FinalApplicationStatus 
appStatus, String appMessage, String appTrackingUrl) {
+   unregisterApplicationMasterConsumer.accept(appStatus, 
appMessage, appTrackingUrl);
+   }
+
+   void setGetMatchingRequestsFunction(
+   Function, 
List>>
+   

[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-20 Thread GitBox


tillrohrmann commented on a change in pull request #11353:
URL: https://github.com/apache/flink/pull/11353#discussion_r411413787



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
##
@@ -243,24 +248,33 @@ protected NMClientAsync 
createAndStartNodeManagerClient(YarnConfiguration yarnCo
 
// domain objects for test purposes
final ResourceProfile resourceProfile1 = 
ResourceProfile.UNKNOWN;
+   final WorkerResourceSpec workerResourceSpec;
+
+   final Resource containerResource;
 
public String taskHost = "host1";
 
final TestingYarnNMClientAsync testingYarnNMClientAsync;
 
final TestingYarnAMRMClientAsync testingYarnAMRMClientAsync;
 
+   int containerIdx = 0;
+
/**
 * Create mock RM dependencies.
 */
Context() throws Exception {
-   this(flinkConfig);
+   this(flinkConfig, null);
}
 
-   Context(Configuration configuration) throws  Exception {
-   final SlotManager slotManager = 
SlotManagerBuilder.newBuilder()
-   
.setDefaultWorkerResourceSpec(YarnWorkerResourceSpecFactory.INSTANCE.createDefaultWorkerResourceSpec(configuration))
-   .build();
+   Context(Configuration configuration, @Nullable SlotManager 
slotManager) throws  Exception {
+
+   workerResourceSpec = 
YarnWorkerResourceSpecFactory.INSTANCE.createDefaultWorkerResourceSpec(configuration);
+   if (slotManager == null) {
+   slotManager = SlotManagerBuilder.newBuilder()
+   
.setDefaultWorkerResourceSpec(workerResourceSpec)
+   .build();
+   }

Review comment:
   I think it would be fine to add a 
`SlotManager.getDefaultWorkerResourceSpec()`. This could solve the problem here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-20 Thread GitBox


tillrohrmann commented on a change in pull request #11353:
URL: https://github.com/apache/flink/pull/11353#discussion_r411421064



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflectorTest.java
##
@@ -88,7 +94,44 @@ public void testGetMethodReflectiveHadoop22() {
final RegisterApplicationMasterResponseReflector 
registerApplicationMasterResponseReflector =
new RegisterApplicationMasterResponseReflector(LOG);
 
-   final Method method = 
registerApplicationMasterResponseReflector.getMethod();
+   final Method method = 
registerApplicationMasterResponseReflector.getGetContainersFromPreviousAttemptsMethod();
+   assertThat(method, notNullValue());
+   }
+
+   @Test
+   public void testCallsGetSchedulerResourceTypesMethodIfPresent() {
+   final RegisterApplicationMasterResponseReflector 
registerApplicationMasterResponseReflector =
+   new RegisterApplicationMasterResponseReflector(LOG, 
HasMethod.class);
+
+   final Optional> schedulerResourceTypeNames =
+   
registerApplicationMasterResponseReflector.getSchedulerResourceTypeNamesUnsafe(new
 HasMethod());

Review comment:
   Makes sense. I agree that your proposal is better.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-20 Thread GitBox


tillrohrmann commented on a change in pull request #11353:
URL: https://github.com/apache/flink/pull/11353#discussion_r411413787



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
##
@@ -243,24 +248,33 @@ protected NMClientAsync 
createAndStartNodeManagerClient(YarnConfiguration yarnCo
 
// domain objects for test purposes
final ResourceProfile resourceProfile1 = 
ResourceProfile.UNKNOWN;
+   final WorkerResourceSpec workerResourceSpec;
+
+   final Resource containerResource;
 
public String taskHost = "host1";
 
final TestingYarnNMClientAsync testingYarnNMClientAsync;
 
final TestingYarnAMRMClientAsync testingYarnAMRMClientAsync;
 
+   int containerIdx = 0;
+
/**
 * Create mock RM dependencies.
 */
Context() throws Exception {
-   this(flinkConfig);
+   this(flinkConfig, null);
}
 
-   Context(Configuration configuration) throws  Exception {
-   final SlotManager slotManager = 
SlotManagerBuilder.newBuilder()
-   
.setDefaultWorkerResourceSpec(YarnWorkerResourceSpecFactory.INSTANCE.createDefaultWorkerResourceSpec(configuration))
-   .build();
+   Context(Configuration configuration, @Nullable SlotManager 
slotManager) throws  Exception {
+
+   workerResourceSpec = 
YarnWorkerResourceSpecFactory.INSTANCE.createDefaultWorkerResourceSpec(configuration);
+   if (slotManager == null) {
+   slotManager = SlotManagerBuilder.newBuilder()
+   
.setDefaultWorkerResourceSpec(workerResourceSpec)
+   .build();
+   }

Review comment:
   I think it would be fine to add a 
`SlotManager.getDefaultWorkerResourceSpec()`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409595126
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapterTest.java
 ##
 @@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link WorkerSpecContainerResourceAdapter}.
+ */
+public class WorkerSpecContainerResourceAdapterTest extends TestLogger {
+
+   @Test
+   public void testMatchVcores() {
+   final int minMemMB = 100;
+   final int minVcore = 10;
+   final WorkerSpecContainerResourceAdapter adapter =
+   new WorkerSpecContainerResourceAdapter(
+   getConfigProcessSpecEqualsWorkerSpec(),
+   minMemMB,
+   minVcore,
+   Integer.MAX_VALUE,
+   Integer.MAX_VALUE,
+   
WorkerSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE);
+
+   final WorkerResourceSpec workerSpec1 = new 
WorkerResourceSpec.Builder()
+   .setCpuCores(1.0)
+   .setTaskHeapMemoryMB(10)
+   .setTaskOffHeapMemoryMB(10)
+   .setNetworkMemoryMB(10)
+   .setManagedMemoryMB(10)
+   .build();
+   final WorkerResourceSpec workerSpec2 = new 
WorkerResourceSpec.Builder()
+   .setCpuCores(10.0)
+   .setTaskHeapMemoryMB(25)
+   .setTaskOffHeapMemoryMB(25)
+   .setNetworkMemoryMB(25)
+   .setManagedMemoryMB(25)
+   .build();
+   final WorkerResourceSpec workerSpec3 = new 
WorkerResourceSpec.Builder()
+   .setCpuCores(5.0)
+   .setTaskHeapMemoryMB(30)
+   .setTaskOffHeapMemoryMB(30)
+   .setNetworkMemoryMB(30)
+   .setManagedMemoryMB(30)
+   .build();
+   final WorkerResourceSpec workerSpec4 = new 
WorkerResourceSpec.Builder()
+   .setCpuCores(15.0)
+   .setTaskHeapMemoryMB(10)
+   .setTaskOffHeapMemoryMB(10)
+   .setNetworkMemoryMB(10)
+   .setManagedMemoryMB(10)
+   .build();
+
+   final Resource containerResource1 = Resource.newInstance(100, 
10);
+   final Resource containerResource2 = Resource.newInstance(200, 
10);
+   final Resource containerResource3 = Resource.newInstance(100, 
20);
+
+   assertThat(adapter.getWorkerSpecs(containerResource1), empty());
+   assertThat(adapter.getWorkerSpecs(containerResource2), empty());
+
+   assertThat(adapter.getContainerResource(workerSpec1).get(), 
is(containerResource1));
+   assertThat(adapter.getContainerResource(workerSpec2).get(), 
is(containerResource1));
+   assertThat(adapter.getContainerResource(workerSpec3).get(), 
is(containerResource2));
+   assertThat(adapter.getContainerResource(workerSpec4).get(), 
is(containerResource3));
+
+ 

[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409579792
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
 ##
 @@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for converting between Flink {@link WorkerResourceSpec} and 
Yarn {@link Resource}.
+ */
+public class WorkerSpecContainerResourceAdapter {
+   private final Logger log = 
LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class);
 
 Review comment:
   ```suggestion
private static final Logger LOG = 
LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409575963
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
 ##
 @@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.TriConsumer;
+import org.apache.flink.util.function.TriFunction;
+
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * A Yarn {@link AMRMClientAsync} implementation for testing.
+ */
+public class TestingYarnAMRMClientAsync extends 
AMRMClientAsyncImpl {
+
+   private Function, 
List>>
+   getMatchingRequestsFunction = ignored -> 
Collections.emptyList();
+   private BiConsumer 
addContainerRequestConsumer = (ignored1, ignored2) -> {};
+   private BiConsumer 
removeContainerRequestConsumer = (ignored1, ignored2) -> {};
+   private BiConsumer 
releaseAssignedContainerConsumer = (ignored1, ignored2) -> {};
+   private Consumer setHeartbeatIntervalConsumer = (ignored) -> 
{};
+   private TriFunction registerApplicationMasterFunction =
+   (ignored1, ignored2, ignored3) -> 
RegisterApplicationMasterResponse.newInstance(
+   Resource.newInstance(0, 0),
+   Resource.newInstance(Integer.MAX_VALUE, 
Integer.MAX_VALUE),
+   Collections.emptyMap(),
+   null,
+   Collections.emptyList(),
+   null,
+   Collections.emptyList());
+   private TriConsumer 
unregisterApplicationMasterConsumer = (ignored1, ignored2, ignored3) -> {};
+
+   TestingYarnAMRMClientAsync(CallbackHandler callbackHandler) {
+   super(0, callbackHandler);
+   }
+
+   @Override
+   public List> 
getMatchingRequests(Priority priority, String resourceName, Resource 
capability) {
+   return getMatchingRequestsFunction.apply(Tuple4.of(priority, 
resourceName, capability, handler));
+   }
+
+   @Override
+   public void addContainerRequest(AMRMClient.ContainerRequest req) {
+   addContainerRequestConsumer.accept(req, handler);
+   }
+
+   @Override
+   public void removeContainerRequest(AMRMClient.ContainerRequest req) {
+   removeContainerRequestConsumer.accept(req, handler);
+   }
+
+   @Override
+   public void releaseAssignedContainer(ContainerId containerId) {
+   releaseAssignedContainerConsumer.accept(containerId, handler);
+   }
+
+   @Override
+   public void setHeartbeatInterval(int interval) {
+   setHeartbeatIntervalConsumer.accept(interval);
+   }
+
+   @Override
+   public RegisterApplicationMasterResponse 
registerApplicationMaster(String appHostName, int appHostPort, String 
appTrackingUrl) {
+   return registerApplicationMasterFunction.apply(appHostName, 
appHostPort, appTrackingUrl);
+   }
+
+   @Override
+   public void unregisterApplicationMaster(FinalApplicationStatus 
appStatus, String appMessage, String appTrackingUrl) {
+   unregisterApplicationMasterConsumer.accept(appStatus, 
appMessage, appTrackingUrl);
+   }

[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409624390
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -117,7 +118,9 @@
/** Client to communicate with the Node manager and launch TaskExecutor 
processes. */
private NMClientAsync nodeManagerClient;
 
-   private final WorkerSpecContainerResourceAdapter 
workerSpecContainerResourceAdapter;
+   private WorkerSpecContainerResourceAdapter 
workerSpecContainerResourceAdapter = null;
 
 Review comment:
   Either annotate with `@Nullable` and add the corresponding null checks or 
initialize with a mock `WorkerSpecContainerResourceAdapter` which fails on 
every call with an exception saying that the `RM` has not been properly 
initialized.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409596634
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
 ##
 @@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for converting between Flink {@link WorkerResourceSpec} and 
Yarn {@link Resource}.
+ */
+public class WorkerSpecContainerResourceAdapter {
+   private final Logger log = 
LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class);
+
+   private final Configuration flinkConfig;
+   private final int minMemMB;
+   private final int minVcore;
+   private final int maxMemMB;
+   private final int maxVcore;
+   private final WorkerSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
+   private final Map 
workerSpecToContainerResource;
+   private final Map> 
containerResourceToWorkerSpecs;
 
 Review comment:
   Can the value type be a set?
   
   ```suggestion
private final Map> 
containerResourceToWorkerSpecs;
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409572279
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
 ##
 @@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.TriConsumer;
+import org.apache.flink.util.function.TriFunction;
+
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * A Yarn {@link AMRMClientAsync} implementation for testing.
+ */
+public class TestingYarnAMRMClientAsync extends 
AMRMClientAsyncImpl {
+
+   private Function, 
List>>
+   getMatchingRequestsFunction = ignored -> 
Collections.emptyList();
+   private BiConsumer 
addContainerRequestConsumer = (ignored1, ignored2) -> {};
+   private BiConsumer 
removeContainerRequestConsumer = (ignored1, ignored2) -> {};
+   private BiConsumer 
releaseAssignedContainerConsumer = (ignored1, ignored2) -> {};
+   private Consumer setHeartbeatIntervalConsumer = (ignored) -> 
{};
+   private TriFunction registerApplicationMasterFunction =
+   (ignored1, ignored2, ignored3) -> 
RegisterApplicationMasterResponse.newInstance(
+   Resource.newInstance(0, 0),
+   Resource.newInstance(Integer.MAX_VALUE, 
Integer.MAX_VALUE),
+   Collections.emptyMap(),
+   null,
+   Collections.emptyList(),
+   null,
+   Collections.emptyList());
+   private TriConsumer 
unregisterApplicationMasterConsumer = (ignored1, ignored2, ignored3) -> {};
+
+   TestingYarnAMRMClientAsync(CallbackHandler callbackHandler) {
+   super(0, callbackHandler);
+   }
+
+   @Override
+   public List> 
getMatchingRequests(Priority priority, String resourceName, Resource 
capability) {
+   return getMatchingRequestsFunction.apply(Tuple4.of(priority, 
resourceName, capability, handler));
+   }
+
+   @Override
+   public void addContainerRequest(AMRMClient.ContainerRequest req) {
+   addContainerRequestConsumer.accept(req, handler);
+   }
+
+   @Override
+   public void removeContainerRequest(AMRMClient.ContainerRequest req) {
+   removeContainerRequestConsumer.accept(req, handler);
+   }
+
+   @Override
+   public void releaseAssignedContainer(ContainerId containerId) {
+   releaseAssignedContainerConsumer.accept(containerId, handler);
+   }
+
+   @Override
+   public void setHeartbeatInterval(int interval) {
+   setHeartbeatIntervalConsumer.accept(interval);
+   }
+
+   @Override
+   public RegisterApplicationMasterResponse 
registerApplicationMaster(String appHostName, int appHostPort, String 
appTrackingUrl) {
+   return registerApplicationMasterFunction.apply(appHostName, 
appHostPort, appTrackingUrl);
+   }
+
+   @Override
+   public void unregisterApplicationMaster(FinalApplicationStatus 
appStatus, String appMessage, String appTrackingUrl) {
+   unregisterApplicationMasterConsumer.accept(appStatus, 
appMessage, appTrackingUrl);
+   }

[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409583691
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
 ##
 @@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for converting between Flink {@link WorkerResourceSpec} and 
Yarn {@link Resource}.
+ */
+public class WorkerSpecContainerResourceAdapter {
+   private final Logger log = 
LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class);
+
+   private final Configuration flinkConfig;
+   private final int minMemMB;
+   private final int minVcore;
+   private final int maxMemMB;
+   private final int maxVcore;
+   private final WorkerSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
+   private final Map 
workerSpecToContainerResource;
+   private final Map> 
containerResourceToWorkerSpecs;
+   private final Map> 
containerMemoryToContainerResource;
+
+   @VisibleForTesting
 
 Review comment:
   I think we don't need the `@VisibleForTesting` annotations in this class 
because we did not increase the visibility of these methods for testing 
purposes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409581744
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
 ##
 @@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for converting between Flink {@link WorkerResourceSpec} and 
Yarn {@link Resource}.
+ */
+public class WorkerSpecContainerResourceAdapter {
+   private final Logger log = 
LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class);
+
+   private final Configuration flinkConfig;
+   private final int minMemMB;
+   private final int minVcore;
+   private final int maxMemMB;
+   private final int maxVcore;
+   private final WorkerSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
+   private final Map 
workerSpecToContainerResource;
+   private final Map> 
containerResourceToWorkerSpecs;
+   private final Map> 
containerMemoryToContainerResource;
+
+   @VisibleForTesting
+   WorkerSpecContainerResourceAdapter(
+   final Configuration flinkConfig,
+   final int minMemMB,
+   final int minVcore,
+   final int maxMemMB,
+   final int maxVcore,
+   final WorkerSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy) {
+   this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
+   this.minMemMB = minMemMB;
+   this.minVcore = minVcore;
+   this.maxMemMB = maxMemMB;
+   this.maxVcore = maxVcore;
+   this.matchingStrategy = matchingStrategy;
+   workerSpecToContainerResource = new HashMap<>();
+   containerResourceToWorkerSpecs = new HashMap<>();
+   containerMemoryToContainerResource = new HashMap<>();
+   }
+
+   @VisibleForTesting
+   Optional getContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
+   return 
Optional.ofNullable(workerSpecToContainerResource.computeIfAbsent(
+   Preconditions.checkNotNull(workerResourceSpec),
+   this::createAndMapContainerResource));
+   }
+
+   @VisibleForTesting
+   Set getWorkerSpecs(final Resource 
containerResource) {
+   return 
getEquivalentContainerResource(containerResource).stream()
+   .flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptyList()).stream())
+   .collect(Collectors.toSet());
+   }
+
+   @VisibleForTesting
+   Set getEquivalentContainerResource(final Resource 
containerResource) {
+   // Yarn might ignore the requested vcores, depending on its 
configurations.
+   // In such cases, we should also not matching vcores.
+   final Set equivalentContainerResources;
+   switch (matchingStrategy) {
+   case MATCH_VCORE:
+   equivalentContainerResources = 
Collections.singleton(containerResource);
+   break;
+   case IGNORE_VCORE:
+   default:

[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409619888
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
 ##
 @@ -53,12 +59,20 @@
requireNonNull(clazz);
 
try {
-   method = 
clazz.getMethod("getContainersFromPreviousAttempts");
+   getContainersFromPreviousAttemptsMethod = 
clazz.getMethod("getContainersFromPreviousAttempts");
} catch (NoSuchMethodException e) {
// that happens in earlier Hadoop versions (pre 2.2)
logger.info("Cannot reconnect to previously allocated 
containers. " +
"This YARN version does not support 
'getContainersFromPreviousAttempts()'");
}
+
+   try {
+   getSchedulerResourceTypesMethod = 
clazz.getMethod("getSchedulerResourceTypes");
+   } catch (NoSuchMethodException e) {
+   // that happens in earlier Hadoop versions (pre 2.6)
+   logger.info("Cannot get scheduler resource types. " +
+   "This YARN version does not support 
'getSchedulerResourceTypes()'");
+   }
 
 Review comment:
   One could think about factoring the lookup of methods and the logging 
statement out into a another method to avoid code duplication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409580203
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
 ##
 @@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for converting between Flink {@link WorkerResourceSpec} and 
Yarn {@link Resource}.
+ */
+public class WorkerSpecContainerResourceAdapter {
+   private final Logger log = 
LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class);
+
+   private final Configuration flinkConfig;
+   private final int minMemMB;
+   private final int minVcore;
+   private final int maxMemMB;
+   private final int maxVcore;
 
 Review comment:
   nit:
   ```suggestion
private final int minMemMB;
private final int maxMemMB;
private final int minVcore;
private final int maxVcore;
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409627647
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflectorTest.java
 ##
 @@ -88,7 +94,44 @@ public void testGetMethodReflectiveHadoop22() {
final RegisterApplicationMasterResponseReflector 
registerApplicationMasterResponseReflector =
new RegisterApplicationMasterResponseReflector(LOG);
 
-   final Method method = 
registerApplicationMasterResponseReflector.getMethod();
+   final Method method = 
registerApplicationMasterResponseReflector.getGetContainersFromPreviousAttemptsMethod();
+   assertThat(method, notNullValue());
+   }
+
+   @Test
+   public void testCallsGetSchedulerResourceTypesMethodIfPresent() {
+   final RegisterApplicationMasterResponseReflector 
registerApplicationMasterResponseReflector =
+   new RegisterApplicationMasterResponseReflector(LOG, 
HasMethod.class);
+
+   final Optional> schedulerResourceTypeNames =
+   
registerApplicationMasterResponseReflector.getSchedulerResourceTypeNamesUnsafe(new
 HasMethod());
 
 Review comment:
   Instead of calling private methods of the 
`RegisterApplicationMasterResponseReflector` we could also add an `assumeTrue` 
statement based on the Hadoop version. Then we can have two tests for Hadoop >= 
2.6 and < 2.6.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409611795
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -243,24 +248,33 @@ protected NMClientAsync 
createAndStartNodeManagerClient(YarnConfiguration yarnCo
 
// domain objects for test purposes
final ResourceProfile resourceProfile1 = 
ResourceProfile.UNKNOWN;
+   final WorkerResourceSpec workerResourceSpec;
+
+   final Resource containerResource;
 
public String taskHost = "host1";
 
final TestingYarnNMClientAsync testingYarnNMClientAsync;
 
final TestingYarnAMRMClientAsync testingYarnAMRMClientAsync;
 
+   int containerIdx = 0;
+
/**
 * Create mock RM dependencies.
 */
Context() throws Exception {
-   this(flinkConfig);
+   this(flinkConfig, null);
}
 
-   Context(Configuration configuration) throws  Exception {
-   final SlotManager slotManager = 
SlotManagerBuilder.newBuilder()
-   
.setDefaultWorkerResourceSpec(YarnWorkerResourceSpecFactory.INSTANCE.createDefaultWorkerResourceSpec(configuration))
-   .build();
+   Context(Configuration configuration, @Nullable SlotManager 
slotManager) throws  Exception {
+
+   workerResourceSpec = 
YarnWorkerResourceSpecFactory.INSTANCE.createDefaultWorkerResourceSpec(configuration);
+   if (slotManager == null) {
+   slotManager = SlotManagerBuilder.newBuilder()
+   
.setDefaultWorkerResourceSpec(workerResourceSpec)
+   .build();
+   }
 
 Review comment:
   Why not moving this block into the default constructor and removing the 
`@Nullable` annotation from `slotManager`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409608614
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -552,38 +581,44 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
 * Request new container if pending containers cannot satisfy pending 
slot requests.
 */
private void requestYarnContainerIfRequired() {
-   int requiredTaskManagers = getNumberRequiredWorkers();
-
-   while (requiredTaskManagers-- > numPendingContainerRequests) {
-   requestYarnContainer();
-   }
+   
getNumberRequiredWorkersPerWorkerResourceSpec().entrySet().stream()
+   .filter(entry -> entry.getValue() > 
getNumPendingWorkersFor(entry.getKey()))
+   .forEach(entry -> requestYarnContainer(entry.getKey()));
 
 Review comment:
   `forEach` is always an indicator that one could do the same with a `while` 
loop. In this case I would suggest to keep it simple and stupid:
   
   ```suggestion
for (Map.Entry 
requiredWorkersPerResourceSpec : 
getNumberRequiredWorkersPerWorkerResourceSpec().entrySet()) {
final WorkerResourceSpec workerResourceSpec = 
requiredWorkersPerResourceSpec.getKey();
while (requiredWorkersPerResourceSpec.getValue() > 
getNumPendingWorkersFor(workerResourceSpec)) {
requestYarnContainer(workerResourceSpec);
}
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409622180
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/RegisterApplicationMasterResponseReflector.java
 ##
 @@ -96,7 +110,40 @@
}
 
@VisibleForTesting
-   Method getMethod() {
-   return method;
+   Method getGetContainersFromPreviousAttemptsMethod() {
+   return getContainersFromPreviousAttemptsMethod;
+   }
+
+   /**
+* Get names of resource types that are considered by the Yarn 
scheduler.
+* @param response The response object from the registration at the 
ResourceManager.
+* @return A set of resource type names, or {@link Optional#empty()} if 
the Yarn version does not support this API.
+*/
+   Optional> getSchedulerResourceTypeNames(final 
RegisterApplicationMasterResponse response) {
+   return getSchedulerResourceTypeNamesUnsafe(response);
+   }
+
+   @VisibleForTesting
+   Optional> getSchedulerResourceTypeNamesUnsafe(final Object 
response) {
 
 Review comment:
   ```suggestion
private Optional> getSchedulerResourceTypeNamesUnsafe(final 
Object response) {
   ```
   
   and remove `@VisibleForTesting`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409576668
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java
 ##
 @@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.TriConsumer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+
+/**
+ * A Yarn {@link NMClientAsync} implementation for testing.
+ */
+class TestingYarnNMClientAsync extends NMClientAsyncImpl {
+
+   private TriConsumer 
startContainerAsyncConsumer = (ignored1, ignored2, ignored3) -> {};
+   private TriConsumer 
stopContainerAsyncConsumer = (ignored1, ignored2, ignored3) -> {};
+
+   TestingYarnNMClientAsync(final CallbackHandler callbackHandler) {
+   super(callbackHandler);
+   }
+
+   @Override
+   public void startContainerAsync(Container container, 
ContainerLaunchContext containerLaunchContext) {
+   this.startContainerAsyncConsumer.accept(container, 
containerLaunchContext, callbackHandler);
+   }
+
+   @Override
+   public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
+   this.stopContainerAsyncConsumer.accept(containerId, nodeId, 
callbackHandler);
+   }
+
+   void setStartContainerAsyncConsumer(TriConsumer startContainerAsyncConsumer) {
+   this.startContainerAsyncConsumer = 
Preconditions.checkNotNull(startContainerAsyncConsumer);
+   }
+
+   void setStopContainerAsyncConsumer(TriConsumer stopContainerAsyncConsumer) {
+   this.stopContainerAsyncConsumer = 
Preconditions.checkNotNull(stopContainerAsyncConsumer);
+   }
 
 Review comment:
   Same here with the setters and `volatile` fields.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409610652
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -552,38 +581,44 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
 * Request new container if pending containers cannot satisfy pending 
slot requests.
 */
private void requestYarnContainerIfRequired() {
-   int requiredTaskManagers = getNumberRequiredWorkers();
-
-   while (requiredTaskManagers-- > numPendingContainerRequests) {
-   requestYarnContainer();
-   }
+   
getNumberRequiredWorkersPerWorkerResourceSpec().entrySet().stream()
+   .filter(entry -> entry.getValue() > 
getNumPendingWorkersFor(entry.getKey()))
+   .forEach(entry -> requestYarnContainer(entry.getKey()));
 
 Review comment:
   I assume that `requestYarnContainer` should never return `false` here, 
right? If this is the case, then let's add a `checkState` to ensure this 
invariant.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409589090
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptionsInternal.java
 ##
 @@ -34,4 +37,27 @@
.stringType()
.noDefaultValue()
.withDescription("**DO NOT USE** The 
location of the log config file, e.g. the path to your log4j.properties for 
log4j.");
+
+   /**
+* **DO NO USE** Whether {@link YarnResourceManager} should match the 
vcores of allocated containers with those requested.
 
 Review comment:
   ```suggestion
 * **DO NOT USE** Whether {@link YarnResourceManager} should match the 
vcores of allocated containers with those requested.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409630377
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -117,7 +118,9 @@
/** Client to communicate with the Node manager and launch TaskExecutor 
processes. */
private NMClientAsync nodeManagerClient;
 
-   private final WorkerSpecContainerResourceAdapter 
workerSpecContainerResourceAdapter;
+   private WorkerSpecContainerResourceAdapter 
workerSpecContainerResourceAdapter = null;
 
 Review comment:
   An idea to keep the `WorkerSpecContainerResourceAdapter` final is to pass in 
the `WorkerSpecContainerResourceAdapter.MatchingStrategy` to the method 
`getWorkerSpecs`. If you look at the `WorkerSpecContainerResourceAdapter` class 
then one also sees that the matching strategy is not really an essential part 
of it. Only the lookup method changes its behaviour based on it. Everything 
else stays the same.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-16 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r409592189
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/WorkerSpecContainerResourceAdapter.java
 ##
 @@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility class for converting between Flink {@link WorkerResourceSpec} and 
Yarn {@link Resource}.
+ */
+public class WorkerSpecContainerResourceAdapter {
+   private final Logger log = 
LoggerFactory.getLogger(WorkerSpecContainerResourceAdapter.class);
+
+   private final Configuration flinkConfig;
+   private final int minMemMB;
+   private final int minVcore;
+   private final int maxMemMB;
+   private final int maxVcore;
+   private final WorkerSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
+   private final Map 
workerSpecToContainerResource;
+   private final Map> 
containerResourceToWorkerSpecs;
+   private final Map> 
containerMemoryToContainerResource;
+
+   @VisibleForTesting
+   WorkerSpecContainerResourceAdapter(
+   final Configuration flinkConfig,
+   final int minMemMB,
+   final int minVcore,
+   final int maxMemMB,
+   final int maxVcore,
+   final WorkerSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy) {
+   this.flinkConfig = Preconditions.checkNotNull(flinkConfig);
+   this.minMemMB = minMemMB;
+   this.minVcore = minVcore;
+   this.maxMemMB = maxMemMB;
+   this.maxVcore = maxVcore;
+   this.matchingStrategy = matchingStrategy;
+   workerSpecToContainerResource = new HashMap<>();
+   containerResourceToWorkerSpecs = new HashMap<>();
+   containerMemoryToContainerResource = new HashMap<>();
+   }
+
+   @VisibleForTesting
+   Optional getContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
 
 Review comment:
   Maybe rename into `tryComputeContainerResource` or so because this method is 
not simply a look up. It is rather a creation call with a caching mechanism.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-07 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r404641088
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration 
configuration) {
//noinspection NumericCastThatLosesPrecision
return cpuCoresLong;
}
+
+   /**
+* Utility class for converting between Flink {@link 
WorkerResourceSpec} and Yarn {@link Resource}.
+*/
+   @VisibleForTesting
+   static class WorkerSpecContainerResourceAdapter {
+   private final Configuration flinkConfig;
+   private final int minMemMB;
+   private final int minVcore;
+   private final boolean matchVcores;
+   private final Map 
workerSpecToContainerResource;
+   private final Map> 
containerResourceToWorkerSpecs;
+   private final Map> 
containerMemoryToContainerResource;
+
+   @VisibleForTesting
+   WorkerSpecContainerResourceAdapter(
+   final Configuration flinkConfig,
+   final int minMemMB,
+   final int minVcore,
+   final boolean matchVcores) {
+   this.flinkConfig = 
Preconditions.checkNotNull(flinkConfig);
+   this.minMemMB = minMemMB;
+   this.minVcore = minVcore;
+   this.matchVcores = matchVcores;
+   workerSpecToContainerResource = new HashMap<>();
+   containerResourceToWorkerSpecs = new HashMap<>();
+   containerMemoryToContainerResource = new HashMap<>();
+   }
+
+   @VisibleForTesting
+   Resource getContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
+   return workerSpecToContainerResource.computeIfAbsent(
+   Preconditions.checkNotNull(workerResourceSpec),
+   this::createAndMapContainerResource);
+   }
+
+   @VisibleForTesting
+   Collection getWorkerSpecs(final Resource 
containerResource) {
+   return 
getEquivalentContainerResource(containerResource).stream()
+   .flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptyList()).stream())
+   .collect(Collectors.toList());
+   }
+
+   @VisibleForTesting
+   Collection getEquivalentContainerResource(final 
Resource containerResource) {
+   // Yarn might ignore the requested vcores, depending on 
its configurations.
+   // In such cases, we should also not matching vcores.
+   return matchVcores ?
+   Collections.singletonList(containerResource) :
+   
containerMemoryToContainerResource.getOrDefault(containerResource.getMemory(), 
Collections.emptyList());
+   }
+
+   private Resource createAndMapContainerResource(final 
WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
 
 Review comment:
   The current SM will be initialized with the proper default 
`WorkerResourceSpec` derived from Flink's configuration, right? Hence, we could 
already resolve this TODO, right? Then there would not be any problems with 
enabling support for dynamic worker resources.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-07 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r404639691
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.increaseAndGet(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
 
 Review comment:
   Ok, instead of creating a TODO, I would suggest to create a JIRA ticket 
which is linked as a follow up. TODO's tend to be forgotten.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-07 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r404634322
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
 ##
 @@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * A Yarn {@link AMRMClientAsync} implementation for testing.
+ */
+public class TestingYarnAMRMClientAsync extends 
AMRMClientAsyncImpl {
 
 Review comment:
   I think this is the price we have to pay for integrating with another 
system. It is always painful and tedious to set up the tests. In parts, this 
always involves reimplementing the other systems behaviour in mocks. I don't 
see a better atm. Hence, +1 for what you've proposed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r404160153
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -412,30 +439,32 @@ private void 
releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId
 
final ResourceID resourceId = new 
ResourceID(containerId.toString());
// release the failed container
-   workerNodeMap.remove(resourceId);
+   YarnWorkerNode yarnWorkerNode = 
workerNodeMap.remove(resourceId);
resourceManagerClient.releaseAssignedContainer(containerId);
// and ask for a new one
-   requestYarnContainerIfRequired();
+   
requestYarnContainerIfRequired(yarnWorkerNode.getContainer().getResource());
}
 
private void returnExcessContainer(Container excessContainer) {
log.info("Returning excess container {}.", 
excessContainer.getId());

resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
}
 
-   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest) {
-   numPendingContainerRequests--;
-
-   log.info("Removing container request {}. Pending container 
requests {}.", pendingContainerRequest, numPendingContainerRequests);
-
+   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest, WorkerResourceSpec workerResourceSpec) {
+   log.info("Removing container request {}.", 
pendingContainerRequest);
+   pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
 
 Review comment:
   Not saying that we have to implement it right away. I just want to know how 
one could fix this as a follow-up task.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r404146599
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -540,39 +571,41 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
/**
 * Request new container if pending containers cannot satisfy pending 
slot requests.
 */
-   private void requestYarnContainerIfRequired() {
-   int requiredTaskManagers = getNumberRequiredTaskManagers();
-
-   if (requiredTaskManagers > numPendingContainerRequests) {
-   requestYarnContainer();
-   }
+   private void requestYarnContainerIfRequired(Resource containerResource) 
{
+   getPendingWorkerNums().entrySet().stream()
 
 Review comment:
   maybe we could rename `getPendingWorkerNums` into `getRequiredWorkers` or 
`getRequiredResources`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r404144900
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -412,30 +439,32 @@ private void 
releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId
 
final ResourceID resourceId = new 
ResourceID(containerId.toString());
// release the failed container
-   workerNodeMap.remove(resourceId);
+   YarnWorkerNode yarnWorkerNode = 
workerNodeMap.remove(resourceId);
resourceManagerClient.releaseAssignedContainer(containerId);
// and ask for a new one
-   requestYarnContainerIfRequired();
+   
requestYarnContainerIfRequired(yarnWorkerNode.getContainer().getResource());
}
 
private void returnExcessContainer(Container excessContainer) {
log.info("Returning excess container {}.", 
excessContainer.getId());

resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
}
 
-   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest) {
-   numPendingContainerRequests--;
-
-   log.info("Removing container request {}. Pending container 
requests {}.", pendingContainerRequest, numPendingContainerRequests);
-
+   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest, WorkerResourceSpec workerResourceSpec) {
+   log.info("Removing container request {}.", 
pendingContainerRequest);
+   pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
 
 Review comment:
   Hmm, would it be possible to have a periodic cleanup task?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r404142447
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
 ##
 @@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * A Yarn {@link AMRMClientAsync} implementation for testing.
+ */
+public class TestingYarnAMRMClientAsync extends 
AMRMClientAsyncImpl {
 
 Review comment:
   I think my problem with this class is that we are partially overriding an 
implementation class here. We don't really know how the overridden methods are 
used internally. Looking at the code, it looks as if they are all forwarding 
the calls to `client`. From this perspective it might look fine but if you take 
a look at `getAvailableResources`, which has not been overridden, then it also 
calls `client`. Do we know that there is no contract between the overridden 
methods and `getAvailableResources`? What if the `client` returns different 
available resources depending on how many container requests have been added? 
I'm not saying that this is the case, but I want to make that overriding 
individual methods of an implementation class can cause failures which are hard 
to predict and even harder to debug.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r404134516
 
 

 ##
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingContainer.java
 ##
 @@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+
+/**
+ * A {@link Container} implementation for testing.
+ */
+class TestingContainer extends Container {
 
 Review comment:
   Using `ContainerPBImpl` should work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r404128962
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptionsInternal.java
 ##
 @@ -34,4 +37,24 @@
.stringType()
.noDefaultValue()
.withDescription("**DO NOT USE** The 
location of the log config file, e.g. the path to your log4j.properties for 
log4j.");
+
+   /**
+* **DO NO USE** Whether {@link YarnResourceManager} should match the 
vcores of allocated containers with those requested.
+*
+* By default, Yarn ignores vcores in the container requests, and 
always allocate 1 vcore for each container.
+* Iff 'yarn.scheduler.capacity.resource-calculator' is set to 
'DominantResourceCalculator' for Yarn, will it
+* allocate container vcores as requested. Unfortunately, this 
configuration option is dedicated for Yarn Scheduler,
+* and is only accessible to applications in Hadoop 2.6+.
+*
+* ATM, it should be fine to not match vcores, because with the 
current {@link SlotManagerImpl} all the TM
+* containers should have the same resources.
+*
+* If later we add another {@link SlotManager} implementation that 
may have TMs with different resources, we can
+* switch this option on only for the new SM, and the new SM can also 
be available on Hadoop 2.6+ only.
+*/
+   public static final ConfigOption MATCH_CONTAINER_VCORES =
+   
key("$internal.yarn.resourcemanager.enable-vcore-matching")
+   .booleanType()
+   .defaultValue(false)
+   .withDescription("**DO NOT USE** 
Whether YarnResourceManager should match the container vcores.");
 
 Review comment:
   Ok, thanks for the clarification. Automatically configuring this flag with 
Hadoop >= 2.6 sounds like a good idea.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r404124170
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration 
configuration) {
//noinspection NumericCastThatLosesPrecision
return cpuCoresLong;
}
+
+   /**
+* Utility class for converting between Flink {@link 
WorkerResourceSpec} and Yarn {@link Resource}.
+*/
+   @VisibleForTesting
+   static class WorkerSpecContainerResourceAdapter {
+   private final Configuration flinkConfig;
+   private final int minMemMB;
+   private final int minVcore;
+   private final boolean matchVcores;
+   private final Map 
workerSpecToContainerResource;
+   private final Map> 
containerResourceToWorkerSpecs;
+   private final Map> 
containerMemoryToContainerResource;
+
+   @VisibleForTesting
+   WorkerSpecContainerResourceAdapter(
+   final Configuration flinkConfig,
+   final int minMemMB,
+   final int minVcore,
+   final boolean matchVcores) {
+   this.flinkConfig = 
Preconditions.checkNotNull(flinkConfig);
+   this.minMemMB = minMemMB;
+   this.minVcore = minVcore;
+   this.matchVcores = matchVcores;
+   workerSpecToContainerResource = new HashMap<>();
+   containerResourceToWorkerSpecs = new HashMap<>();
+   containerMemoryToContainerResource = new HashMap<>();
+   }
+
+   @VisibleForTesting
+   Resource getContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
+   return workerSpecToContainerResource.computeIfAbsent(
+   Preconditions.checkNotNull(workerResourceSpec),
+   this::createAndMapContainerResource);
+   }
+
+   @VisibleForTesting
+   Collection getWorkerSpecs(final Resource 
containerResource) {
+   return 
getEquivalentContainerResource(containerResource).stream()
+   .flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptyList()).stream())
+   .collect(Collectors.toList());
+   }
+
+   @VisibleForTesting
+   Collection getEquivalentContainerResource(final 
Resource containerResource) {
+   // Yarn might ignore the requested vcores, depending on 
its configurations.
+   // In such cases, we should also not matching vcores.
+   return matchVcores ?
+   Collections.singletonList(containerResource) :
+   
containerMemoryToContainerResource.getOrDefault(containerResource.getMemory(), 
Collections.emptyList());
+   }
+
+   private Resource createAndMapContainerResource(final 
WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
 
 Review comment:
   In general I agree unless it is not yet fully supported by the RM 
implementation. If it is, then I guess we also don't need this comment. If it 
is not supported, then the check state will help us to remember what needs to 
be changed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r404123445
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -321,6 +339,47 @@ public void testGetCpuCoresNumSlots() {
assertThat(resourceManager.getCpuCores(configuration), is(3.0));
}
 
+   @Test
+   public void testStartAndRecoverVariousResourceSpec() {
+   // Start two workers with different resources
+   final WorkerResourceSpec workerResourceSpec1 = new 
WorkerResourceSpec(1.0, 100, 0, 100, 100);
+   final WorkerResourceSpec workerResourceSpec2 = new 
WorkerResourceSpec(1.0, 99, 0, 100, 100);
+   resourceManager.startNewWorker(workerResourceSpec1);
+   resourceManager.startNewWorker(workerResourceSpec2);
+
+   // Verify two pods with both worker resources are started
+   final PodList initialPodList = kubeClient.pods().list();
+   assertEquals(2, initialPodList.getItems().size());
+   final Pod initialPod1 = getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20));
+   final Pod initialPod2 = getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20));
+
+   // Notify resource manager about pods added.
+   final KubernetesPod initialKubernetesPod1 = new 
KubernetesPod(initialPod1);
+   final KubernetesPod initialKubernetesPod2 = new 
KubernetesPod(initialPod2);
+   resourceManager.onAdded(ImmutableList.of(initialKubernetesPod1, 
initialKubernetesPod2));
+
+   // Terminate pod1.
+   terminatePod(initialPod1);
+   
resourceManager.setCustomPendingWorkerNums(Collections.singletonMap(workerResourceSpec1,
 1));
 
 Review comment:
   This sounds good.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-06 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r404121234
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.increaseAndGet(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
 
 Review comment:
   So do we have the requirement that what's written in the config and what's 
specified in `workerResourceSpec` have to be the same? If yes, then I think we 
should add a check state here. This will ensure that we remember what needs to 
be adjusted in order to support dynamic worker resources.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403089669
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.increaseAndGet(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
+   final TaskExecutorProcessSpec taskExecutorProcessSpec =
+   
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);
 
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
final String dynamicProperties =

BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-   final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+   return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);
-
-   final KubernetesPod taskManagerPod =
-   
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-   log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-   kubeClient.createTaskManagerPod(taskManagerPod);
}
 
/**
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
-   private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+   private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+   final int requiredTaskManagers = 
getPendingWorkerNums().get(workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.getNum(workerResourceSpec);
 
 Review comment:
   I think I would hide `pendingWorkerCounter` behind some methods which the 
base class provides.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403128803
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -540,39 +571,41 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
/**
 * Request new container if pending containers cannot satisfy pending 
slot requests.
 */
-   private void requestYarnContainerIfRequired() {
-   int requiredTaskManagers = getNumberRequiredTaskManagers();
-
-   if (requiredTaskManagers > numPendingContainerRequests) {
-   requestYarnContainer();
-   }
+   private void requestYarnContainerIfRequired(Resource containerResource) 
{
+   getPendingWorkerNums().entrySet().stream()
+   .filter(entry ->
+   
getContainerResource(entry.getKey()).equals(containerResource) &&
+   entry.getValue() > 
pendingWorkerCounter.getNum(entry.getKey()))
+   .findAny()
 
 Review comment:
   Shouldn't we do this for all instead of any?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403115740
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingContainerStatus.java
 ##
 @@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+/**
+ * A {@link ContainerStatus} implementation for testing.
+ */
+class TestingContainerStatus extends ContainerStatus {
+
+   private final ContainerId containerId;
+   private final ContainerState containerState;
+   private final String diagnostics;
+   private final int exitStatus;
+
+   TestingContainerStatus(
+   final ContainerId containerId,
+   final ContainerState containerState,
+   final String diagnostics,
+   final int exitStatus) {
+
+   this.containerId = containerId;
+   this.containerState = containerState;
+   this.diagnostics = diagnostics;
+   this.exitStatus = exitStatus;
+   }
+
+   @Override
+   public ContainerId getContainerId() {
+   return containerId;
+   }
+
+   @Override
+   public void setContainerId(ContainerId containerId) {
+
 
 Review comment:
   Shouldn't we fail in case someone calls `setContainerId` here? Otherwise it 
might go unnoticed and result in some strange behaviour/failure.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403121941
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+
+import java.util.function.Consumer;
+
+/**
+ * A Yarn {@link NMClientAsync} implementation for testing.
+ */
+class TestingYarnNMClientAsync extends NMClientAsyncImpl {
+
+   private Consumer> startContainerAsyncConsumer = ignored -> {};
+   private Consumer> 
stopContainerAsyncConsumer = ignored -> {};
 
 Review comment:
   I'd suggest to use the `TriConsumer` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403132471
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -540,39 +571,41 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
/**
 * Request new container if pending containers cannot satisfy pending 
slot requests.
 */
-   private void requestYarnContainerIfRequired() {
-   int requiredTaskManagers = getNumberRequiredTaskManagers();
-
-   if (requiredTaskManagers > numPendingContainerRequests) {
-   requestYarnContainer();
-   }
+   private void requestYarnContainerIfRequired(Resource containerResource) 
{
 
 Review comment:
   Shouldn't we iterate over all resources which are needed instead of 
restricting it to `containerResource`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403133265
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactoryTest.java
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.entrypoint;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link YarnResourceManagerFactory}.
+ */
+public class YarnResourceManagerFactoryTest {
 
 Review comment:
   ```suggestion
   public class YarnResourceManagerFactoryTest extends TestLogger {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403090988
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.increaseAndGet(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
+   final TaskExecutorProcessSpec taskExecutorProcessSpec =
+   
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
 
final String podName = String.format(
TASK_MANAGER_POD_FORMAT,
clusterId,
currentMaxAttemptId,
++currentMaxPodId);
 
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
+
final String dynamicProperties =

BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, flinkConfig);
 
-   final KubernetesTaskManagerParameters 
kubernetesTaskManagerParameters = new KubernetesTaskManagerParameters(
+   return new KubernetesTaskManagerParameters(
flinkConfig,
podName,
dynamicProperties,
taskManagerParameters);
-
-   final KubernetesPod taskManagerPod =
-   
KubernetesTaskManagerFactory.createTaskManagerComponent(kubernetesTaskManagerParameters);
-
-   log.info("TaskManager {} will be started with {}.", podName, 
taskExecutorProcessSpec);
-   kubeClient.createTaskManagerPod(taskManagerPod);
}
 
/**
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
-   private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
+   private void requestKubernetesPodIfRequired(WorkerResourceSpec 
workerResourceSpec) {
+   final int requiredTaskManagers = 
getPendingWorkerNums().get(workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.getNum(workerResourceSpec);
 
-   if (requiredTaskManagers > numPendingPodRequests) {
-   requestKubernetesPod();
+   if (requiredTaskManagers > pendingWorkerNum) {
+   requestKubernetesPod(workerResourceSpec);
}
}
 
private void removePodIfTerminated(KubernetesPod pod) {
if (pod.isTerminated()) {
kubeClient.stopPod(pod.getName());
-   final KubernetesWorkerNode kubernetesWorkerNode = 
workerNodes.remove(new ResourceID(pod.getName()));
-   if (kubernetesWorkerNode != null) {
-   requestKubernetesPodIfRequired();
+   final WorkerResourceSpec workerResourceSpec = 
removeWorkerNodeAndResourceSpec(new ResourceID(pod.getName()));
+   if (workerResourceSpec != null) {
+   
requestKubernetesPodIfRequired(workerResourceSpec);
 

[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403118223
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
 ##
 @@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * A Yarn {@link AMRMClientAsync} implementation for testing.
+ */
+public class TestingYarnAMRMClientAsync extends 
AMRMClientAsyncImpl {
 
 Review comment:
   `AMRMClientAsyncImpl` is annotated as unstable. I'm not sure how will the 
testing implementation works across different Yarn versions. Have we tried this 
out?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403105830
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration 
configuration) {
//noinspection NumericCastThatLosesPrecision
return cpuCoresLong;
}
+
+   /**
+* Utility class for converting between Flink {@link 
WorkerResourceSpec} and Yarn {@link Resource}.
+*/
+   @VisibleForTesting
+   static class WorkerSpecContainerResourceAdapter {
+   private final Configuration flinkConfig;
+   private final int minMemMB;
+   private final int minVcore;
+   private final boolean matchVcores;
+   private final Map 
workerSpecToContainerResource;
+   private final Map> 
containerResourceToWorkerSpecs;
 
 Review comment:
   If we are interested in every `WorkerResourceSpec` which ever resulted into 
a given `Resource`, then I would suggest to change the value type to `List`. 
Otherwise one could instantiate this field with a `Set` which has different 
semantics.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403113398
 
 

 ##
 File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingContainer.java
 ##
 @@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+
+/**
+ * A {@link Container} implementation for testing.
+ */
+class TestingContainer extends Container {
 
 Review comment:
   I like the idea of getting rid of Mockito but how do we ensure that this 
works with all Hadoop versions? Looking at Hadoop 2.10. 
https://github.com/apache/hadoop/blob/release-2.10.0-RC1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java
 it looks as if the container has gotten some more methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403088133
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.increaseAndGet(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
 
 Review comment:
   Should we add a check state to ensure that we fail in case that we request a 
different size?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403078672
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerTest.java
 ##
 @@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Tests for {@link ActiveResourceManager}.
+ */
+public class ActiveResourceManagerTest {
 
 Review comment:
   ```suggestion
   public class ActiveResourceManagerTest extends TestLogger {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403126015
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -363,31 +356,64 @@ public void onContainersCompleted(final 
List statuses) {
@Override
public void onContainersAllocated(List containers) {
runAsync(() -> {
-   log.info("Received {} containers with {} pending 
container requests.", containers.size(), numPendingContainerRequests);
-   final Collection 
pendingRequests = getPendingRequests();
-   final Iterator 
pendingRequestsIterator = pendingRequests.iterator();
+   log.info("Received {} containers.", containers.size());
 
-   // number of allocated containers can be larger than 
the number of pending container requests
-   final int numAcceptedContainers = 
Math.min(containers.size(), numPendingContainerRequests);
-   final List requiredContainers = 
containers.subList(0, numAcceptedContainers);
-   final List excessContainers = 
containers.subList(numAcceptedContainers, containers.size());
-
-   for (int i = 0; i < requiredContainers.size(); i++) {
-   
removeContainerRequest(pendingRequestsIterator.next());
-   }
-
-   excessContainers.forEach(this::returnExcessContainer);
-   
requiredContainers.forEach(this::startTaskExecutorInContainer);
+   
groupContainerByResource(containers).forEach(this::onContainersOfResourceAllocated);
 
// if we are waiting for no further containers, we can 
go to the
// regular heartbeat interval
-   if (numPendingContainerRequests <= 0) {
+   if (pendingWorkerCounter.getTotalNum() <= 0) {

resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
}
});
}
 
-   private void startTaskExecutorInContainer(Container container) {
+   private Map> 
groupContainerByResource(List containers) {
+   return 
containers.stream().collect(Collectors.groupingBy(Container::getResource));
+   }
+
+   private void onContainersOfResourceAllocated(Resource resource, 
List containers) {
+   final List pendingWorkerResourceSpecs =
+   
workerSpecContainerResourceAdapter.getWorkerSpecs(resource).stream()
 
 Review comment:
   Can it happen that `getWorkerSpecs(resource)` returns a list which contains 
a `WorkerResourceSpec` twice?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403099591
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration 
configuration) {
//noinspection NumericCastThatLosesPrecision
return cpuCoresLong;
}
+
+   /**
+* Utility class for converting between Flink {@link 
WorkerResourceSpec} and Yarn {@link Resource}.
+*/
+   @VisibleForTesting
+   static class WorkerSpecContainerResourceAdapter {
+   private final Configuration flinkConfig;
+   private final int minMemMB;
+   private final int minVcore;
+   private final boolean matchVcores;
+   private final Map 
workerSpecToContainerResource;
+   private final Map> 
containerResourceToWorkerSpecs;
+   private final Map> 
containerMemoryToContainerResource;
+
+   @VisibleForTesting
+   WorkerSpecContainerResourceAdapter(
+   final Configuration flinkConfig,
+   final int minMemMB,
+   final int minVcore,
+   final boolean matchVcores) {
+   this.flinkConfig = 
Preconditions.checkNotNull(flinkConfig);
+   this.minMemMB = minMemMB;
+   this.minVcore = minVcore;
+   this.matchVcores = matchVcores;
+   workerSpecToContainerResource = new HashMap<>();
+   containerResourceToWorkerSpecs = new HashMap<>();
+   containerMemoryToContainerResource = new HashMap<>();
+   }
+
+   @VisibleForTesting
+   Resource getContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
+   return workerSpecToContainerResource.computeIfAbsent(
+   Preconditions.checkNotNull(workerResourceSpec),
+   this::createAndMapContainerResource);
+   }
+
+   @VisibleForTesting
+   Collection getWorkerSpecs(final Resource 
containerResource) {
+   return 
getEquivalentContainerResource(containerResource).stream()
+   .flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptyList()).stream())
+   .collect(Collectors.toList());
+   }
+
+   @VisibleForTesting
+   Collection getEquivalentContainerResource(final 
Resource containerResource) {
+   // Yarn might ignore the requested vcores, depending on 
its configurations.
+   // In such cases, we should also not matching vcores.
+   return matchVcores ?
+   Collections.singletonList(containerResource) :
+   
containerMemoryToContainerResource.getOrDefault(containerResource.getMemory(), 
Collections.emptyList());
+   }
+
+   private Resource createAndMapContainerResource(final 
WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
 
 Review comment:
   I would suggest to add a check state to ensure that we fail once we enable 
dynamic worker resources.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403130352
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -540,39 +571,41 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
/**
 * Request new container if pending containers cannot satisfy pending 
slot requests.
 */
-   private void requestYarnContainerIfRequired() {
-   int requiredTaskManagers = getNumberRequiredTaskManagers();
-
-   if (requiredTaskManagers > numPendingContainerRequests) {
-   requestYarnContainer();
-   }
+   private void requestYarnContainerIfRequired(Resource containerResource) 
{
+   getPendingWorkerNums().entrySet().stream()
 
 Review comment:
   I have to admit that I find `getPendingWorkerNums()` and 
`pendingWorkerCounter` quite confusing. The sound almost the same but the 
former means the requirements of the `SlotManager` and the latter the currently 
pending workers which have been requested by the RM.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403104873
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration 
configuration) {
//noinspection NumericCastThatLosesPrecision
return cpuCoresLong;
}
+
+   /**
+* Utility class for converting between Flink {@link 
WorkerResourceSpec} and Yarn {@link Resource}.
+*/
+   @VisibleForTesting
+   static class WorkerSpecContainerResourceAdapter {
+   private final Configuration flinkConfig;
+   private final int minMemMB;
+   private final int minVcore;
+   private final boolean matchVcores;
+   private final Map 
workerSpecToContainerResource;
+   private final Map> 
containerResourceToWorkerSpecs;
+   private final Map> 
containerMemoryToContainerResource;
+
+   @VisibleForTesting
+   WorkerSpecContainerResourceAdapter(
+   final Configuration flinkConfig,
+   final int minMemMB,
+   final int minVcore,
+   final boolean matchVcores) {
+   this.flinkConfig = 
Preconditions.checkNotNull(flinkConfig);
+   this.minMemMB = minMemMB;
+   this.minVcore = minVcore;
+   this.matchVcores = matchVcores;
+   workerSpecToContainerResource = new HashMap<>();
+   containerResourceToWorkerSpecs = new HashMap<>();
+   containerMemoryToContainerResource = new HashMap<>();
+   }
+
+   @VisibleForTesting
+   Resource getContainerResource(final WorkerResourceSpec 
workerResourceSpec) {
+   return workerSpecToContainerResource.computeIfAbsent(
+   Preconditions.checkNotNull(workerResourceSpec),
+   this::createAndMapContainerResource);
+   }
+
+   @VisibleForTesting
+   Collection getWorkerSpecs(final Resource 
containerResource) {
+   return 
getEquivalentContainerResource(containerResource).stream()
+   .flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptyList()).stream())
+   .collect(Collectors.toList());
+   }
+
+   @VisibleForTesting
+   Collection getEquivalentContainerResource(final 
Resource containerResource) {
 
 Review comment:
   If we are only interested in the equivalence class, then I would suggest to 
change the type from `Collection` to `Set`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403086625
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -183,16 +178,18 @@ public boolean stopWorker(final KubernetesWorkerNode 
worker) {
@Override
public void onAdded(List pods) {
runAsync(() -> {
-   for (KubernetesPod pod : pods) {
-   if (numPendingPodRequests > 0) {
-   numPendingPodRequests--;
+   pods.forEach(pod -> {
+   WorkerResourceSpec workerResourceSpec = 
podWorkerResources.get(pod.getName());
+   final int pendingNum = 
pendingWorkerCounter.getNum(workerResourceSpec);
+   if (pendingNum > 0) {
+   
pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
 
 Review comment:
   I'm wondering whether this logic shouldn't go into the 
`ActiveResourceManager`. I would expect that all `ActiveResourceManager` 
implementations would need to do something similar. Maybe we could introduce 
`notifyNewWorkerStarted(WorkerResourceSpec)`. This could also have the benefit 
that we could hide `pendingWorkerCounter` completely.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403097275
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -615,4 +632,85 @@ protected double getCpuCores(final Configuration 
configuration) {
//noinspection NumericCastThatLosesPrecision
return cpuCoresLong;
}
+
+   /**
+* Utility class for converting between Flink {@link 
WorkerResourceSpec} and Yarn {@link Resource}.
+*/
+   @VisibleForTesting
+   static class WorkerSpecContainerResourceAdapter {
 
 Review comment:
   I think this class is large enough to warrant its own file. This would also 
decrease the size of this source code file a bit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403131968
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -412,30 +439,32 @@ private void 
releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId
 
final ResourceID resourceId = new 
ResourceID(containerId.toString());
// release the failed container
-   workerNodeMap.remove(resourceId);
+   YarnWorkerNode yarnWorkerNode = 
workerNodeMap.remove(resourceId);
resourceManagerClient.releaseAssignedContainer(containerId);
// and ask for a new one
-   requestYarnContainerIfRequired();
+   
requestYarnContainerIfRequired(yarnWorkerNode.getContainer().getResource());
}
 
private void returnExcessContainer(Container excessContainer) {
log.info("Returning excess container {}.", 
excessContainer.getId());

resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
}
 
-   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest) {
-   numPendingContainerRequests--;
-
-   log.info("Removing container request {}. Pending container 
requests {}.", pendingContainerRequest, numPendingContainerRequests);
-
+   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest, WorkerResourceSpec workerResourceSpec) {
+   log.info("Removing container request {}.", 
pendingContainerRequest);
+   pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
 
 Review comment:
   Differently asked, when do we clean `workerSpecContainerResourceAdapter` up?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403092773
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -176,6 +185,15 @@ MainThreadExecutor getMainThreadExecutorForTesting() {
SlotManager getSlotManager() {
return this.slotManager;
}
+
+   @Override
+   public Map getPendingWorkerNums() {
+   return customPendingWorkerNums != null ? 
customPendingWorkerNums : super.getPendingWorkerNums();
+   }
+
+   public void setCustomPendingWorkerNums(final 
Map customPendingWorkerNums) {
+   this.customPendingWorkerNums = customPendingWorkerNums;
+   }
 
 Review comment:
   I think this is pretty much whitebox testing as it strongly relies on 
internal implementation details. I would recommend to go another way and to 
rely either on the public APIs of the component or to encapsulate the 
bookkeeping logic so that it can be tested separately.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403123867
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -363,31 +356,64 @@ public void onContainersCompleted(final 
List statuses) {
@Override
public void onContainersAllocated(List containers) {
runAsync(() -> {
-   log.info("Received {} containers with {} pending 
container requests.", containers.size(), numPendingContainerRequests);
-   final Collection 
pendingRequests = getPendingRequests();
-   final Iterator 
pendingRequestsIterator = pendingRequests.iterator();
+   log.info("Received {} containers.", containers.size());
 
-   // number of allocated containers can be larger than 
the number of pending container requests
-   final int numAcceptedContainers = 
Math.min(containers.size(), numPendingContainerRequests);
-   final List requiredContainers = 
containers.subList(0, numAcceptedContainers);
-   final List excessContainers = 
containers.subList(numAcceptedContainers, containers.size());
-
-   for (int i = 0; i < requiredContainers.size(); i++) {
-   
removeContainerRequest(pendingRequestsIterator.next());
-   }
-
-   excessContainers.forEach(this::returnExcessContainer);
-   
requiredContainers.forEach(this::startTaskExecutorInContainer);
+   
groupContainerByResource(containers).forEach(this::onContainersOfResourceAllocated);
 
 Review comment:
   I'd suggest to use the for-each loop.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403088473
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -232,57 +229,75 @@ private void recoverWorkerNodesFromPreviousAttempts() 
throws ResourceManagerExce
++currentMaxAttemptId);
}
 
-   private void requestKubernetesPod() {
-   numPendingPodRequests++;
+   private void requestKubernetesPod(WorkerResourceSpec 
workerResourceSpec) {
+   final KubernetesTaskManagerParameters parameters =
+   
createKubernetesTaskManagerParameters(workerResourceSpec);
+
+   podWorkerResources.put(parameters.getPodName(), 
workerResourceSpec);
+   final int pendingWorkerNum = 
pendingWorkerCounter.increaseAndGet(workerResourceSpec);
 
log.info("Requesting new TaskManager pod with <{},{}>. Number 
pending requests {}.",
-   defaultMemoryMB,
-   defaultCpus,
-   numPendingPodRequests);
+   parameters.getTaskManagerMemoryMB(),
+   parameters.getTaskManagerCPU(),
+   pendingWorkerNum);
+   log.info("TaskManager {} will be started with {}.", 
parameters.getPodName(), workerResourceSpec);
+
+   final KubernetesPod taskManagerPod =
+   
KubernetesTaskManagerFactory.createTaskManagerComponent(parameters);
+   kubeClient.createTaskManagerPod(taskManagerPod);
+   }
+
+   private KubernetesTaskManagerParameters 
createKubernetesTaskManagerParameters(WorkerResourceSpec workerResourceSpec) {
+   // TODO: need to unset process/flink memory size from 
configuration if dynamic worker resource is activated
 
 Review comment:
   Btw: where do we change the `Configuration`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403127546
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -412,30 +439,32 @@ private void 
releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId
 
final ResourceID resourceId = new 
ResourceID(containerId.toString());
// release the failed container
-   workerNodeMap.remove(resourceId);
+   YarnWorkerNode yarnWorkerNode = 
workerNodeMap.remove(resourceId);
resourceManagerClient.releaseAssignedContainer(containerId);
// and ask for a new one
-   requestYarnContainerIfRequired();
+   
requestYarnContainerIfRequired(yarnWorkerNode.getContainer().getResource());
}
 
private void returnExcessContainer(Container excessContainer) {
log.info("Returning excess container {}.", 
excessContainer.getId());

resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
}
 
-   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest) {
-   numPendingContainerRequests--;
-
-   log.info("Removing container request {}. Pending container 
requests {}.", pendingContainerRequest, numPendingContainerRequests);
-
+   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest, WorkerResourceSpec workerResourceSpec) {
+   log.info("Removing container request {}.", 
pendingContainerRequest);
+   pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
 
 Review comment:
   What about removing it from `workerSpecContainerResourceAdapter`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403107046
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptionsInternal.java
 ##
 @@ -34,4 +37,24 @@
.stringType()
.noDefaultValue()
.withDescription("**DO NOT USE** The 
location of the log config file, e.g. the path to your log4j.properties for 
log4j.");
+
+   /**
+* **DO NO USE** Whether {@link YarnResourceManager} should match the 
vcores of allocated containers with those requested.
+*
+* By default, Yarn ignores vcores in the container requests, and 
always allocate 1 vcore for each container.
+* Iff 'yarn.scheduler.capacity.resource-calculator' is set to 
'DominantResourceCalculator' for Yarn, will it
+* allocate container vcores as requested. Unfortunately, this 
configuration option is dedicated for Yarn Scheduler,
+* and is only accessible to applications in Hadoop 2.6+.
+*
+* ATM, it should be fine to not match vcores, because with the 
current {@link SlotManagerImpl} all the TM
+* containers should have the same resources.
+*
+* If later we add another {@link SlotManager} implementation that 
may have TMs with different resources, we can
+* switch this option on only for the new SM, and the new SM can also 
be available on Hadoop 2.6+ only.
+*/
+   public static final ConfigOption MATCH_CONTAINER_VCORES =
+   
key("$internal.yarn.resourcemanager.enable-vcore-matching")
+   .booleanType()
+   .defaultValue(false)
+   .withDescription("**DO NOT USE** 
Whether YarnResourceManager should match the container vcores.");
 
 Review comment:
   Does this mean that one has to configure ones Flink cluster depending on the 
configuration of the Yarn cluster? What happens if one forgets about Flink?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403108543
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -584,6 +590,85 @@ public void testGetCpuExceedMaxInt() throws Exception {
}};
}
 
+   @Test
+   public void testWorkerSpecContainerResourceAdapter_MatchVcores() {
+   final int minMemMB = 100;
+   final int minVcore = 10;
+   final YarnResourceManager.WorkerSpecContainerResourceAdapter 
adapter =
+   new 
YarnResourceManager.WorkerSpecContainerResourceAdapter(
+   getConfigProcessSpecEqualsWorkerSpec(), 
minMemMB, minVcore, true);
 
 Review comment:
   It is usually easier to understand if one use an enum instead of boolean 
because one can give the different values expressive names (e.g. 
`MATCH_VCORES`, `IGNORE_VCORES`).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403118602
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
 ##
 @@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * A Yarn {@link AMRMClientAsync} implementation for testing.
+ */
+public class TestingYarnAMRMClientAsync extends 
AMRMClientAsyncImpl {
+
+   private Function, 
List>>
+   getMatchingRequestsFunction = ignored -> 
Collections.emptyList();
+   private Consumer> 
addContainerRequestConsumer = ignored -> {};
+   private Consumer> 
removeContainerRequestConsumer = ignored -> {};
+   private Consumer> 
releaseAssignedContainerConsumer = ignored -> {};
 
 Review comment:
   I'd suggest to use the `BiConsumer` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403132911
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactoryTest.java
 ##
 @@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.entrypoint;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link KubernetesResourceManagerFactory}.
+ */
+public class KubernetesResourceManagerFactoryTest {
 
 Review comment:
   ```suggestion
   public class KubernetesResourceManagerFactoryTest extends TestLogger {
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403093997
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -321,6 +339,47 @@ public void testGetCpuCoresNumSlots() {
assertThat(resourceManager.getCpuCores(configuration), is(3.0));
}
 
+   @Test
+   public void testStartAndRecoverVariousResourceSpec() {
+   // Start two workers with different resources
+   final WorkerResourceSpec workerResourceSpec1 = new 
WorkerResourceSpec(1.0, 100, 0, 100, 100);
+   final WorkerResourceSpec workerResourceSpec2 = new 
WorkerResourceSpec(1.0, 99, 0, 100, 100);
+   resourceManager.startNewWorker(workerResourceSpec1);
+   resourceManager.startNewWorker(workerResourceSpec2);
+
+   // Verify two pods with both worker resources are started
+   final PodList initialPodList = kubeClient.pods().list();
+   assertEquals(2, initialPodList.getItems().size());
+   final Pod initialPod1 = getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20));
+   final Pod initialPod2 = getPodContainsStrInArgs(initialPodList, 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20));
+
+   // Notify resource manager about pods added.
+   final KubernetesPod initialKubernetesPod1 = new 
KubernetesPod(initialPod1);
+   final KubernetesPod initialKubernetesPod2 = new 
KubernetesPod(initialPod2);
+   resourceManager.onAdded(ImmutableList.of(initialKubernetesPod1, 
initialKubernetesPod2));
+
+   // Terminate pod1.
+   terminatePod(initialPod1);
+   
resourceManager.setCustomPendingWorkerNums(Collections.singletonMap(workerResourceSpec1,
 1));
 
 Review comment:
   I would recommend to not test the component like this. It requires detailed 
knowledge of the component's internals and makes it harder to evolve it because 
this test relies on the fact that the `KubernetesResourceManager` has a map of 
`WorkerResourceSpec` to `Integers`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403083789
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -183,16 +178,18 @@ public boolean stopWorker(final KubernetesWorkerNode 
worker) {
@Override
public void onAdded(List pods) {
runAsync(() -> {
-   for (KubernetesPod pod : pods) {
-   if (numPendingPodRequests > 0) {
-   numPendingPodRequests--;
+   pods.forEach(pod -> {
 
 Review comment:
   Call me old fashioned, but I think the for-each loop `for (KubernetesPod 
pod: pods)` is superior to `forEach`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] Make YarnResourceManager starts workers using WorkerResourceSpec requested by SlotManager

2020-04-03 Thread GitBox
tillrohrmann commented on a change in pull request #11353: [FLINK-16438][yarn] 
Make YarnResourceManager starts workers using WorkerResourceSpec requested by 
SlotManager
URL: https://github.com/apache/flink/pull/11353#discussion_r403121671
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+
+import java.util.function.Consumer;
+
+/**
+ * A Yarn {@link NMClientAsync} implementation for testing.
+ */
+class TestingYarnNMClientAsync extends NMClientAsyncImpl {
 
 Review comment:
   Same here `NMClientAsyncImpl` seems to be unstable and might change 
depending on the used Yarn version.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services