[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-10-09 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r502152387



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
##
@@ -385,14 +385,14 @@ public void testStopWorkerAfterRegistration() throws 
Exception {
final CompletableFuture startContainerAsyncFuture 
= new CompletableFuture<>();
final CompletableFuture stopContainerAsyncFuture 
= new CompletableFuture<>();
 
-   
testingYarnAMRMClientAsync.setGetMatchingRequestsFunction(ignored ->
+   
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(ignored ->

Review comment:
   From my understanding, we are sure about that. The `build` function is 
triggered when the ResourceManager is started, to be specific, in 
`Context#runTest`. As we now set the builders and trigger `Context#runTest` 
sequentially in the testing main thread, we could ensure the builders and 
`containerResource` will always be seen by `ResourceManager`. Do I understand 
it correctly?

##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.webmonitor.history.HistoryServerUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.flink.yarn.configuration.YarnResourceManagerConfiguration;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Yarn deployment.
+ */
+public class YarnResourceManagerDriver extends 
AbstractResourceManagerDriver {

[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-10-08 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r502170774



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.yarn.configuration.YarnResourceManagerConfiguration;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_DIST_JAR;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES;
+import static 
org.apache.flink.yarn.YarnResourceManagerDriver.ERROR_MESSAGE_ON_SHUTDOWN_REQUEST;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link YarnResourceManagerDriver}.
+ */
+public class YarnResourceManagerDriverTest extends 
ResourceManagerDriverTestBase {
+   private static final Resource testingResource = 
Resource.newInstance(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
 YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+   private static final Container testingContainer = 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-10-08 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r502162885



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.webmonitor.history.HistoryServerUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.flink.yarn.configuration.YarnResourceManagerConfiguration;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Yarn deployment.
+ */
+public class YarnResourceManagerDriver extends 
AbstractResourceManagerDriver {
+
+   private static final Priority RM_REQUEST_PRIORITY = 
Priority.newInstance(1);
+
+   /** Environment variable name of the hostname given by the YARN.
+* In task executor we use the hostnames given by YARN consistently 
throughout akka */
+   private static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
+
+   static final String ERROR_MESSAGE_ON_SHUTDOWN_REQUEST = "Received 
shutdown request from YARN ResourceManager.";
+
+   private final YarnConfiguration yarnConfig;
+
+   /** The process environment variables. */
+   private final YarnResourceManagerConfiguration configuration;
+
+   /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager. */
+   private final int yarnHeartbeatIntervalMillis;
+
+   /** Client to communicate with the Resource Manager (YARN's master). */
+   private AMRMClientAsync 
resourceManagerClient;
+
+   /** The heartbeat interval while the resource master is waiting for 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-10-08 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r502152387



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
##
@@ -385,14 +385,14 @@ public void testStopWorkerAfterRegistration() throws 
Exception {
final CompletableFuture startContainerAsyncFuture 
= new CompletableFuture<>();
final CompletableFuture stopContainerAsyncFuture 
= new CompletableFuture<>();
 
-   
testingYarnAMRMClientAsync.setGetMatchingRequestsFunction(ignored ->
+   
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(ignored ->

Review comment:
   From my understanding, we are sure about that. The `build` function is 
triggered when the ResourceManager is started, to be specific, in 
`Context#runTest`. As we now set the builders and trigger `Context#runTest` 
sequentially in the testing main thread, we could ensure the builders and 
`containerResource` will always be seen by `ResourceManager`. Do I understand 
it correctly?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-09-15 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488494767



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/CommonProcessMemorySpec.java
##
@@ -101,7 +101,7 @@ public MemorySize getTotalProcessMemorySize() {
public boolean equals(Object obj) {
if (obj == this) {
return true;
-   } else if (obj instanceof CommonProcessMemorySpec ) {
+   } else if (getClass().equals(obj.getClass()) && obj instanceof 
CommonProcessMemorySpec ) {

Review comment:
   I think the first approach would be good enough.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-09-15 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488366730



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##
@@ -508,52 +371,103 @@ private void 
removeContainerRequest(AMRMClient.ContainerRequest pendingContainer
return matchingContainerRequests;
}
 
-   @Override
-   public void onShutdownRequest() {
-   onFatalError(new 
ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST));
-   }
+   private ContainerLaunchContext createTaskExecutorLaunchContext(
+   ResourceID containerId,
+   String host,
+   TaskExecutorProcessSpec taskExecutorProcessSpec) throws 
Exception {
 
-   @Override
-   public void onNodesUpdated(List list) {
-   // We are not interested in node updates
-   }
+   // init the ContainerLaunchContext
+   final String currDir = configuration.getCurrentDir();
 
-   @Override
-   public void onError(Throwable error) {
-   onFatalError(error);
-   }
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
 
-   // 

-   //  NMClientAsync CallbackHandler methods
-   // 

-   @Override
-   public void onContainerStarted(ContainerId containerId, Map map) {
-   log.debug("Succeeded to call YARN Node Manager to start 
container {}.", containerId);
-   }
+   log.info("TaskExecutor {} will be started on {} with {}.",
+   containerId.getStringWithMetadata(),
+   host,
+   taskExecutorProcessSpec);
 
-   @Override
-   public void onContainerStatusReceived(ContainerId containerId, 
ContainerStatus containerStatus) {
-   // We are not interested in getting container status
+   final Configuration taskManagerConfig = 
BootstrapTools.cloneConfiguration(flinkConfig);
+   
taskManagerConfig.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, 
containerId.getResourceIdString());
+   
taskManagerConfig.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA,
 containerId.getMetadata());
+
+   final String taskManagerDynamicProperties =
+   
BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, 
taskManagerConfig);
+
+   log.debug("TaskManager configuration: {}", taskManagerConfig);
+
+   final ContainerLaunchContext taskExecutorLaunchContext = 
Utils.createTaskExecutorContext(
+   flinkConfig,
+   yarnConfig,
+   configuration,
+   taskManagerParameters,
+   taskManagerDynamicProperties,
+   currDir,
+   YarnTaskExecutorRunner.class,
+   log);
+
+   taskExecutorLaunchContext.getEnvironment()
+   .put(ENV_FLINK_NODE_ID, host);
+   return taskExecutorLaunchContext;
}
 
-   @Override
-   public void onContainerStopped(ContainerId containerId) {
-   log.debug("Succeeded to call YARN Node Manager to stop 
container {}.", containerId);
+   @VisibleForTesting
+   Optional getContainerResource(TaskExecutorProcessSpec 
taskExecutorProcessSpec) {
+   return 
taskExecutorProcessSpecContainerResourceAdapter.tryComputeContainerResource(taskExecutorProcessSpec);
}
 
-   @Override
-   public void onStartContainerError(ContainerId containerId, Throwable t) 
{
-   runAsync(() -> 
releaseFailedContainerAndRequestNewContainerIfRequired(containerId, t));
+   private RegisterApplicationMasterResponse registerApplicationMaster() 
throws Exception {
+   final int restPort;
+   final String webInterfaceUrl = 
configuration.getWebInterfaceUrl();
+   final String rpcAddress = configuration.getRpcAddress();
+
+   if (webInterfaceUrl != null) {
+   final int lastColon = webInterfaceUrl.lastIndexOf(':');

Review comment:
   I'm not quite familiar with that logic. I could find the 
`webInterfaceUrl` is originally derived in `RestServerEndpoint`. So it should 
always have a port.
   
   @tillrohrmann Could you help to ensure this?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-09-15 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488412282



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java
##
@@ -104,5 +104,5 @@ private Configuration 
createActiveResourceManagerConfiguration(Configuration ori
resourceManagerMetricGroup);
}
 
-   protected abstract ResourceManagerDriver 
createResourceManagerDriver(Configuration configuration);
+   protected abstract ResourceManagerDriver 
createResourceManagerDriver(Configuration configuration, String 
webInterfaceUrl, String rpcAddress);

Review comment:
   Not sure what you mean by introducing 
`ActiveResourceManagerDriverFactory`, could you give more details about it? 
IIUC, in that case, we need to let `KubernetesResourceManagerFactory`, 
`YarnResourceManagerFactory`, and probably `MesosResourceManagerFactory` to 
override the `createResourceManager` method, which will introduce lots of 
duplicate logic.
   
   BTW, in Mesos implementation, we need `webInterfaceUrl` as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-09-14 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488391778



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java
##
@@ -141,6 +143,24 @@ public MemorySize getManagedMemorySize() {
return getFlinkMemory().getManaged();
}
 
+   @Override
+   public boolean equals(Object obj) {
+   if (obj == this) {
+   return true;
+   } else if (obj instanceof TaskExecutorProcessSpec) {

Review comment:
   I think your concern is probably valid. We could add a class-equal check 
to `CommonProcessMemorySpec#equals`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-09-14 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488366730



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##
@@ -508,52 +371,103 @@ private void 
removeContainerRequest(AMRMClient.ContainerRequest pendingContainer
return matchingContainerRequests;
}
 
-   @Override
-   public void onShutdownRequest() {
-   onFatalError(new 
ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST));
-   }
+   private ContainerLaunchContext createTaskExecutorLaunchContext(
+   ResourceID containerId,
+   String host,
+   TaskExecutorProcessSpec taskExecutorProcessSpec) throws 
Exception {
 
-   @Override
-   public void onNodesUpdated(List list) {
-   // We are not interested in node updates
-   }
+   // init the ContainerLaunchContext
+   final String currDir = configuration.getCurrentDir();
 
-   @Override
-   public void onError(Throwable error) {
-   onFatalError(error);
-   }
+   final ContaineredTaskManagerParameters taskManagerParameters =
+   ContaineredTaskManagerParameters.create(flinkConfig, 
taskExecutorProcessSpec);
 
-   // 

-   //  NMClientAsync CallbackHandler methods
-   // 

-   @Override
-   public void onContainerStarted(ContainerId containerId, Map map) {
-   log.debug("Succeeded to call YARN Node Manager to start 
container {}.", containerId);
-   }
+   log.info("TaskExecutor {} will be started on {} with {}.",
+   containerId.getStringWithMetadata(),
+   host,
+   taskExecutorProcessSpec);
 
-   @Override
-   public void onContainerStatusReceived(ContainerId containerId, 
ContainerStatus containerStatus) {
-   // We are not interested in getting container status
+   final Configuration taskManagerConfig = 
BootstrapTools.cloneConfiguration(flinkConfig);
+   
taskManagerConfig.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, 
containerId.getResourceIdString());
+   
taskManagerConfig.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA,
 containerId.getMetadata());
+
+   final String taskManagerDynamicProperties =
+   
BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, 
taskManagerConfig);
+
+   log.debug("TaskManager configuration: {}", taskManagerConfig);
+
+   final ContainerLaunchContext taskExecutorLaunchContext = 
Utils.createTaskExecutorContext(
+   flinkConfig,
+   yarnConfig,
+   configuration,
+   taskManagerParameters,
+   taskManagerDynamicProperties,
+   currDir,
+   YarnTaskExecutorRunner.class,
+   log);
+
+   taskExecutorLaunchContext.getEnvironment()
+   .put(ENV_FLINK_NODE_ID, host);
+   return taskExecutorLaunchContext;
}
 
-   @Override
-   public void onContainerStopped(ContainerId containerId) {
-   log.debug("Succeeded to call YARN Node Manager to stop 
container {}.", containerId);
+   @VisibleForTesting
+   Optional getContainerResource(TaskExecutorProcessSpec 
taskExecutorProcessSpec) {
+   return 
taskExecutorProcessSpecContainerResourceAdapter.tryComputeContainerResource(taskExecutorProcessSpec);
}
 
-   @Override
-   public void onStartContainerError(ContainerId containerId, Throwable t) 
{
-   runAsync(() -> 
releaseFailedContainerAndRequestNewContainerIfRequired(containerId, t));
+   private RegisterApplicationMasterResponse registerApplicationMaster() 
throws Exception {
+   final int restPort;
+   final String webInterfaceUrl = 
configuration.getWebInterfaceUrl();
+   final String rpcAddress = configuration.getRpcAddress();
+
+   if (webInterfaceUrl != null) {
+   final int lastColon = webInterfaceUrl.lastIndexOf(':');

Review comment:
   I'm not quite familiar with that logic. I could find the 
`webInterfaceUrl` is originally derived in `RestServerEndpoint`. So it should 
always have a port.
   
   @tillrohrmann Could you help the ensure this?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-09-14 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488363795



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##
@@ -435,53 +308,43 @@ private void onContainersOfResourceAllocated(Resource 
resource, List
numAccepted, numExcess, numPending, resource);
}
 
-   @VisibleForTesting
-   static ResourceID getContainerResourceId(Container container) {
-   return new ResourceID(container.getId().toString(), 
container.getNodeId().toString());
+   private int getNumRequestedNotAllocatedWorkers() {
+   return 
requestResourceFutures.values().stream().mapToInt(Queue::size).sum();
+   }
+
+   private int 
getNumRequestedNotAllocatedWorkersFor(TaskExecutorProcessSpec 
taskExecutorProcessSpec) {
+   return 
requestResourceFutures.getOrDefault(taskExecutorProcessSpec, new 
LinkedList<>()).size();
+   }
+
+   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest) {
+   log.info("Removing container request {}.", 
pendingContainerRequest);
+   
resourceManagerClient.removeContainerRequest(pendingContainerRequest);
+   }
+
+   private void returnExcessContainer(Container excessContainer) {
+   log.info("Returning excess container {}.", 
excessContainer.getId());
+   
resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
}
 
-   private void startTaskExecutorInContainer(Container container, 
WorkerResourceSpec workerResourceSpec, ResourceID resourceId) {
-   workerNodeMap.put(resourceId, new YarnWorkerNode(container, 
resourceId));
+   private void startTaskExecutorInContainer(Container container, 
TaskExecutorProcessSpec taskExecutorProcessSpec, ResourceID resourceId, 
CompletableFuture requestResourceFuture) {
+   final YarnWorkerNode yarnWorkerNode = new 
YarnWorkerNode(container, resourceId);
 
try {
// Context information used to start a TaskExecutor 
Java process
ContainerLaunchContext taskExecutorLaunchContext = 
createTaskExecutorLaunchContext(
resourceId,
container.getNodeId().getHost(),
-   
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec));
+   taskExecutorProcessSpec);
 
nodeManagerClient.startContainerAsync(container, 
taskExecutorLaunchContext);
+   requestResourceFuture.complete(yarnWorkerNode);
} catch (Throwable t) {
-   
releaseFailedContainerAndRequestNewContainerIfRequired(container.getId(), t);
+   requestResourceFuture.completeExceptionally(t);
}
}
 
-   private void 
releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId containerId, 
Throwable throwable) {
-   validateRunsInMainThread();
-
-   log.error("Could not start TaskManager in container {}.", 
containerId, throwable);
-
-   final ResourceID resourceId = new 
ResourceID(containerId.toString());
-   // release the failed container
-   workerNodeMap.remove(resourceId);
-   resourceManagerClient.releaseAssignedContainer(containerId);
-   notifyAllocatedWorkerStopped(resourceId);
-   // and ask for a new one
-   requestYarnContainerIfRequired();
-   }
-
-   private void returnExcessContainer(Container excessContainer) {
-   log.info("Returning excess container {}.", 
excessContainer.getId());
-   
resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
-   }
-
-   private void removeContainerRequest(AMRMClient.ContainerRequest 
pendingContainerRequest, WorkerResourceSpec workerResourceSpec) {
-   log.info("Removing container request {}.", 
pendingContainerRequest);
-   
resourceManagerClient.removeContainerRequest(pendingContainerRequest);
-   }
-
private Collection 
getPendingRequestsAndCheckConsistency(Resource resource, int expectedNum) {
-   final Collection equivalentResources = 
workerSpecContainerResourceAdapter.getEquivalentContainerResource(resource, 
matchingStrategy);
+   final Collection equivalentResources = 
taskExecutorProcessSpecContainerResourceAdapter.getEquivalentContainerResource(resource,
 matchingStrategy);
final List> 
matchingRequests =

Review comment:
   I think we could get a bit readability here. Thanks for the remark.





This is an automated message from the Apache Git Service.
To 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-09-14 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488362483



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##
@@ -435,53 +308,43 @@ private void onContainersOfResourceAllocated(Resource 
resource, List
numAccepted, numExcess, numPending, resource);
}
 
-   @VisibleForTesting
-   static ResourceID getContainerResourceId(Container container) {
-   return new ResourceID(container.getId().toString(), 
container.getNodeId().toString());
+   private int getNumRequestedNotAllocatedWorkers() {
+   return 
requestResourceFutures.values().stream().mapToInt(Queue::size).sum();
+   }
+
+   private int 
getNumRequestedNotAllocatedWorkersFor(TaskExecutorProcessSpec 
taskExecutorProcessSpec) {
+   return 
requestResourceFutures.getOrDefault(taskExecutorProcessSpec, new 
LinkedList<>()).size();

Review comment:
   We indeed need an empty queue here, `Collections.emptyList()` is not fit 
this argument.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-09-14 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488361230



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##
@@ -72,354 +62,237 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 /**
- * The yarn implementation of the resource manager. Used when the system is 
started
- * via the resource framework YARN.
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
  */
-public class YarnResourceManager extends 
LegacyActiveResourceManager
-   implements AMRMClientAsync.CallbackHandler, 
NMClientAsync.CallbackHandler {
+public class YarnResourceManagerDriver extends 
AbstractResourceManagerDriver {
 
private static final Priority RM_REQUEST_PRIORITY = 
Priority.newInstance(1);
 
-   /** YARN container map. */
-   private final ConcurrentMap workerNodeMap;
-
/** Environment variable name of the hostname given by the YARN.
 * In task executor we use the hostnames given by YARN consistently 
throughout akka */
static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
 
static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received 
shutdown request from YARN ResourceManager.";
 
-   /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager. */
-   private final int yarnHeartbeatIntervalMillis;
-
private final YarnConfiguration yarnConfig;
 
-   @Nullable
-   private final String webInterfaceUrl;
+   /** The process environment variables. */
+   private final YarnResourceManagerDriverConfiguration configuration;
 
-   /** The heartbeat interval while the resource master is waiting for 
containers. */
-   private final int containerRequestHeartbeatIntervalMillis;
+   /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager. */
+   private final int yarnHeartbeatIntervalMillis;
 
/** Client to communicate with the Resource Manager (YARN's master). */
private AMRMClientAsync 
resourceManagerClient;
 
+   /** The heartbeat interval while the resource master is waiting for 
containers. */
+   private final int containerRequestHeartbeatIntervalMillis;
+
/** Client to communicate with the Node manager and launch TaskExecutor 
processes. */
private NMClientAsync nodeManagerClient;
 
-   private final WorkerSpecContainerResourceAdapter 
workerSpecContainerResourceAdapter;
+   /** Request resource futures, keyed by container ids. */
+   private final Map>> requestResourceFutures;
+
+   private final TaskExecutorProcessSpecContainerResourceAdapter 
taskExecutorProcessSpecContainerResourceAdapter;
 
private final RegisterApplicationMasterResponseReflector 
registerApplicationMasterResponseReflector;
 
-   private WorkerSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
-
-   public YarnResourceManager(
-   RpcService rpcService,
-   ResourceID resourceId,
-   Configuration flinkConfig,
-   Map env,
-   HighAvailabilityServices highAvailabilityServices,
-   HeartbeatServices heartbeatServices,
-   SlotManager slotManager,
-   ResourceManagerPartitionTrackerFactory 
clusterPartitionTrackerFactory,
-   JobLeaderIdService jobLeaderIdService,
-   ClusterInformation clusterInformation,
-   FatalErrorHandler fatalErrorHandler,
-   @Nullable String webInterfaceUrl,
-   ResourceManagerMetricGroup resourceManagerMetricGroup) {
-   super(
-   flinkConfig,
-   env,
-   rpcService,
-   resourceId,
-   highAvailabilityServices,
-   heartbeatServices,
-   slotManager,
-   clusterPartitionTrackerFactory,
-   jobLeaderIdService,
-   clusterInformation,
-   fatalErrorHandler,
-   resourceManagerMetricGroup);
+   private 
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
+
+   private final YarnResourceManagerClientFactory 
yarnResourceManagerClientFactory;
+
+   

[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-09-14 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488360039



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##
@@ -72,354 +62,237 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
 /**
- * The yarn implementation of the resource manager. Used when the system is 
started
- * via the resource framework YARN.
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
  */
-public class YarnResourceManager extends 
LegacyActiveResourceManager
-   implements AMRMClientAsync.CallbackHandler, 
NMClientAsync.CallbackHandler {
+public class YarnResourceManagerDriver extends 
AbstractResourceManagerDriver {
 
private static final Priority RM_REQUEST_PRIORITY = 
Priority.newInstance(1);
 
-   /** YARN container map. */
-   private final ConcurrentMap workerNodeMap;
-
/** Environment variable name of the hostname given by the YARN.
 * In task executor we use the hostnames given by YARN consistently 
throughout akka */
static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
 
static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received 
shutdown request from YARN ResourceManager.";
 
-   /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager. */
-   private final int yarnHeartbeatIntervalMillis;
-
private final YarnConfiguration yarnConfig;
 
-   @Nullable
-   private final String webInterfaceUrl;
+   /** The process environment variables. */
+   private final YarnResourceManagerDriverConfiguration configuration;
 
-   /** The heartbeat interval while the resource master is waiting for 
containers. */
-   private final int containerRequestHeartbeatIntervalMillis;
+   /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager. */
+   private final int yarnHeartbeatIntervalMillis;
 
/** Client to communicate with the Resource Manager (YARN's master). */
private AMRMClientAsync 
resourceManagerClient;
 
+   /** The heartbeat interval while the resource master is waiting for 
containers. */
+   private final int containerRequestHeartbeatIntervalMillis;
+
/** Client to communicate with the Node manager and launch TaskExecutor 
processes. */
private NMClientAsync nodeManagerClient;
 
-   private final WorkerSpecContainerResourceAdapter 
workerSpecContainerResourceAdapter;
+   /** Request resource futures, keyed by container ids. */
+   private final Map>> requestResourceFutures;
+
+   private final TaskExecutorProcessSpecContainerResourceAdapter 
taskExecutorProcessSpecContainerResourceAdapter;
 
private final RegisterApplicationMasterResponseReflector 
registerApplicationMasterResponseReflector;
 
-   private WorkerSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
-
-   public YarnResourceManager(
-   RpcService rpcService,
-   ResourceID resourceId,
-   Configuration flinkConfig,
-   Map env,
-   HighAvailabilityServices highAvailabilityServices,
-   HeartbeatServices heartbeatServices,
-   SlotManager slotManager,
-   ResourceManagerPartitionTrackerFactory 
clusterPartitionTrackerFactory,
-   JobLeaderIdService jobLeaderIdService,
-   ClusterInformation clusterInformation,
-   FatalErrorHandler fatalErrorHandler,
-   @Nullable String webInterfaceUrl,
-   ResourceManagerMetricGroup resourceManagerMetricGroup) {
-   super(
-   flinkConfig,
-   env,
-   rpcService,
-   resourceId,
-   highAvailabilityServices,
-   heartbeatServices,
-   slotManager,
-   clusterPartitionTrackerFactory,
-   jobLeaderIdService,
-   clusterInformation,
-   fatalErrorHandler,
-   resourceManagerMetricGroup);
+   private 
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
+
+   private final YarnResourceManagerClientFactory 
yarnResourceManagerClientFactory;
+
+   

[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-09-14 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488345943



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java
##
@@ -51,30 +65,74 @@ public void stopContainerAsync(ContainerId containerId, 
NodeId nodeId) {
this.stopContainerAsyncConsumer.accept(containerId, nodeId, 
callbackHandler);
}
 
-   void setStartContainerAsyncConsumer(TriConsumer startContainerAsyncConsumer) {
-   this.startContainerAsyncConsumer = 
Preconditions.checkNotNull(startContainerAsyncConsumer);
-   }
-
-   void setStopContainerAsyncConsumer(TriConsumer stopContainerAsyncConsumer) {
-   this.stopContainerAsyncConsumer = 
Preconditions.checkNotNull(stopContainerAsyncConsumer);
+   static Builder builder() {
+   return new Builder();
}
 
// 

//  Override lifecycle methods to avoid actually starting the service
// 

 
@Override
-   protected void serviceInit(Configuration conf) throws Exception {
-   // noop
+   public void init(Configuration conf) {
+   clientInitRunnable.run();
}
 
@Override
-   protected void serviceStart() throws Exception {
-   // noop
+   public void start() {
+   clientStartRunnable.run();
}
 
@Override
-   protected void serviceStop() throws Exception {
-   // noop
+   public void stop() {
+   clientStopRunnable.run();
+   }
+
+   /**
+* Builder class for {@link TestingYarnAMRMClientAsync}.
+*/
+   public static class Builder {
+   private volatile TriConsumer startContainerAsyncConsumer = (ignored1, ignored2, ignored3) 
-> {};
+   private volatile TriConsumer stopContainerAsyncConsumer = (ignored1, ignored2, ignored3) -> 
{};
+   private volatile Runnable clientInitRunnable = () -> {};
+   private volatile Runnable clientStartRunnable = () -> {};
+   private volatile Runnable clientStopRunnable = () -> {};

Review comment:
   Ditto.

##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
##
@@ -45,25 +45,40 @@
  */
 public class TestingYarnAMRMClientAsync extends 
AMRMClientAsyncImpl {
 
-   private volatile Function, List>>
-   getMatchingRequestsFunction = ignored -> 
Collections.emptyList();
-   private volatile BiConsumer addContainerRequestConsumer = (ignored1, ignored2) -> {};
-   private volatile BiConsumer removeContainerRequestConsumer = (ignored1, ignored2) -> {};
-   private volatile BiConsumer 
releaseAssignedContainerConsumer = (ignored1, ignored2) -> {};
-   private volatile Consumer setHeartbeatIntervalConsumer = 
(ignored) -> {};
-   private volatile TriFunction registerApplicationMasterFunction =
-   (ignored1, ignored2, ignored3) -> 
RegisterApplicationMasterResponse.newInstance(
-   Resource.newInstance(0, 0),
-   Resource.newInstance(Integer.MAX_VALUE, 
Integer.MAX_VALUE),
-   Collections.emptyMap(),
-   null,
-   Collections.emptyList(),
-   null,
-   Collections.emptyList());
-   private volatile TriConsumer 
unregisterApplicationMasterConsumer = (ignored1, ignored2, ignored3) -> {};
-
-   TestingYarnAMRMClientAsync(CallbackHandler callbackHandler) {
+   private volatile Function, List>> 
getMatchingRequestsFunction;
+   private volatile BiConsumer addContainerRequestConsumer;
+   private volatile BiConsumer removeContainerRequestConsumer;
+   private volatile BiConsumer 
releaseAssignedContainerConsumer;
+   private volatile Consumer setHeartbeatIntervalConsumer;
+   private volatile TriFunction registerApplicationMasterFunction;
+   private volatile TriConsumer 
unregisterApplicationMasterConsumer;
+   private volatile Runnable clientInitRunnable;
+   private volatile Runnable clientStartRunnable;
+   private volatile Runnable clientStopRunnable;

Review comment:
   Ditto.

##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java
##
@@ -101,58 +116,111 @@ public void 
unregisterApplicationMaster(FinalApplicationStatus appStatus, String
unregisterApplicationMasterConsumer.accept(appStatus, 
appMessage, appTrackingUrl);
}
 
-   void setGetMatchingRequestsFunction(
-   Function, 
List>>
-   getMatchingRequestsFunction) {
-   this.getMatchingRequestsFunction = 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-09-14 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r488345856



##
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java
##
@@ -34,11 +34,25 @@
  */
 class TestingYarnNMClientAsync extends NMClientAsyncImpl {
 
-   private volatile TriConsumer startContainerAsyncConsumer = (ignored1, ignored2, ignored3) 
-> {};
-   private volatile TriConsumer 
stopContainerAsyncConsumer = (ignored1, ignored2, ignored3) -> {};
+   private volatile TriConsumer startContainerAsyncConsumer;
+   private volatile TriConsumer 
stopContainerAsyncConsumer;
+   private volatile Runnable clientInitRunnable;
+   private volatile Runnable clientStartRunnable;
+   private volatile Runnable clientStopRunnable;

Review comment:
   Before we introduce the `Builder` class, these functions could be set by 
multiple threads. You're right, we do not need the `volatile` now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver

2020-09-08 Thread GitBox


KarmaGYZ commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r484792254



##
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
##
@@ -135,7 +135,7 @@ public YarnResourceManager(
JobLeaderIdService jobLeaderIdService,
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
-   @Nullable String webInterfaceUrl,
+   YarnResourceManagerConfiguration 
yarnResourceManagerConfiguration,

Review comment:
   I think the `ApplicationConstants.Environment.PWD` should not belong to 
the `YarnResourceManagerConfiguration `. We may replace it with 
"System.getEnv()" here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org