[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r502152387 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ## @@ -385,14 +385,14 @@ public void testStopWorkerAfterRegistration() throws Exception { final CompletableFuture startContainerAsyncFuture = new CompletableFuture<>(); final CompletableFuture stopContainerAsyncFuture = new CompletableFuture<>(); - testingYarnAMRMClientAsync.setGetMatchingRequestsFunction(ignored -> + testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(ignored -> Review comment: From my understanding, we are sure about that. The `build` function is triggered when the ResourceManager is started, to be specific, in `Context#runTest`. As we now set the builders and trigger `Context#runTest` sequentially in the testing main thread, we could ensure the builders and `containerResource` will always be seen by `ResourceManager`. Do I understand it correctly? ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java ## @@ -0,0 +1,609 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.TaskManagerOptionsInternal; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.webmonitor.history.HistoryServerUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.yarn.configuration.YarnConfigOptions; +import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal; +import org.apache.flink.yarn.configuration.YarnResourceManagerConfiguration; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.net.URL; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Implementation of {@link ResourceManagerDriver} for Yarn deployment. + */ +public class YarnResourceManagerDriver extends AbstractResourceManagerDriver {
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r502170774 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java ## @@ -0,0 +1,544 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.api.common.resources.CPUResource; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.yarn.configuration.YarnResourceManagerConfiguration; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Assume; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH; +import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_DIST_JAR; +import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES; +import static org.apache.flink.yarn.YarnResourceManagerDriver.ERROR_MESSAGE_ON_SHUTDOWN_REQUEST; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link YarnResourceManagerDriver}. + */ +public class YarnResourceManagerDriverTest extends ResourceManagerDriverTestBase { + private static final Resource testingResource = Resource.newInstance(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); + private static final Container testingContainer =
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r502162885 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java ## @@ -0,0 +1,609 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.TaskManagerOptionsInternal; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.webmonitor.history.HistoryServerUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.yarn.configuration.YarnConfigOptions; +import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal; +import org.apache.flink.yarn.configuration.YarnResourceManagerConfiguration; + +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.net.URL; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Implementation of {@link ResourceManagerDriver} for Yarn deployment. + */ +public class YarnResourceManagerDriver extends AbstractResourceManagerDriver { + + private static final Priority RM_REQUEST_PRIORITY = Priority.newInstance(1); + + /** Environment variable name of the hostname given by the YARN. +* In task executor we use the hostnames given by YARN consistently throughout akka */ + private static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID"; + + static final String ERROR_MESSAGE_ON_SHUTDOWN_REQUEST = "Received shutdown request from YARN ResourceManager."; + + private final YarnConfiguration yarnConfig; + + /** The process environment variables. */ + private final YarnResourceManagerConfiguration configuration; + + /** Default heartbeat interval between this resource manager and the YARN ResourceManager. */ + private final int yarnHeartbeatIntervalMillis; + + /** Client to communicate with the Resource Manager (YARN's master). */ + private AMRMClientAsync resourceManagerClient; + + /** The heartbeat interval while the resource master is waiting for
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r502152387 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ## @@ -385,14 +385,14 @@ public void testStopWorkerAfterRegistration() throws Exception { final CompletableFuture startContainerAsyncFuture = new CompletableFuture<>(); final CompletableFuture stopContainerAsyncFuture = new CompletableFuture<>(); - testingYarnAMRMClientAsync.setGetMatchingRequestsFunction(ignored -> + testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(ignored -> Review comment: From my understanding, we are sure about that. The `build` function is triggered when the ResourceManager is started, to be specific, in `Context#runTest`. As we now set the builders and trigger `Context#runTest` sequentially in the testing main thread, we could ensure the builders and `containerResource` will always be seen by `ResourceManager`. Do I understand it correctly? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r488494767 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/CommonProcessMemorySpec.java ## @@ -101,7 +101,7 @@ public MemorySize getTotalProcessMemorySize() { public boolean equals(Object obj) { if (obj == this) { return true; - } else if (obj instanceof CommonProcessMemorySpec ) { + } else if (getClass().equals(obj.getClass()) && obj instanceof CommonProcessMemorySpec ) { Review comment: I think the first approach would be good enough. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r488366730 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java ## @@ -508,52 +371,103 @@ private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainer return matchingContainerRequests; } - @Override - public void onShutdownRequest() { - onFatalError(new ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST)); - } + private ContainerLaunchContext createTaskExecutorLaunchContext( + ResourceID containerId, + String host, + TaskExecutorProcessSpec taskExecutorProcessSpec) throws Exception { - @Override - public void onNodesUpdated(List list) { - // We are not interested in node updates - } + // init the ContainerLaunchContext + final String currDir = configuration.getCurrentDir(); - @Override - public void onError(Throwable error) { - onFatalError(error); - } + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); - // - // NMClientAsync CallbackHandler methods - // - @Override - public void onContainerStarted(ContainerId containerId, Map map) { - log.debug("Succeeded to call YARN Node Manager to start container {}.", containerId); - } + log.info("TaskExecutor {} will be started on {} with {}.", + containerId.getStringWithMetadata(), + host, + taskExecutorProcessSpec); - @Override - public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) { - // We are not interested in getting container status + final Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig); + taskManagerConfig.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, containerId.getResourceIdString()); + taskManagerConfig.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA, containerId.getMetadata()); + + final String taskManagerDynamicProperties = + BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, taskManagerConfig); + + log.debug("TaskManager configuration: {}", taskManagerConfig); + + final ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext( + flinkConfig, + yarnConfig, + configuration, + taskManagerParameters, + taskManagerDynamicProperties, + currDir, + YarnTaskExecutorRunner.class, + log); + + taskExecutorLaunchContext.getEnvironment() + .put(ENV_FLINK_NODE_ID, host); + return taskExecutorLaunchContext; } - @Override - public void onContainerStopped(ContainerId containerId) { - log.debug("Succeeded to call YARN Node Manager to stop container {}.", containerId); + @VisibleForTesting + Optional getContainerResource(TaskExecutorProcessSpec taskExecutorProcessSpec) { + return taskExecutorProcessSpecContainerResourceAdapter.tryComputeContainerResource(taskExecutorProcessSpec); } - @Override - public void onStartContainerError(ContainerId containerId, Throwable t) { - runAsync(() -> releaseFailedContainerAndRequestNewContainerIfRequired(containerId, t)); + private RegisterApplicationMasterResponse registerApplicationMaster() throws Exception { + final int restPort; + final String webInterfaceUrl = configuration.getWebInterfaceUrl(); + final String rpcAddress = configuration.getRpcAddress(); + + if (webInterfaceUrl != null) { + final int lastColon = webInterfaceUrl.lastIndexOf(':'); Review comment: I'm not quite familiar with that logic. I could find the `webInterfaceUrl` is originally derived in `RestServerEndpoint`. So it should always have a port. @tillrohrmann Could you help to ensure this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r488412282 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerFactory.java ## @@ -104,5 +104,5 @@ private Configuration createActiveResourceManagerConfiguration(Configuration ori resourceManagerMetricGroup); } - protected abstract ResourceManagerDriver createResourceManagerDriver(Configuration configuration); + protected abstract ResourceManagerDriver createResourceManagerDriver(Configuration configuration, String webInterfaceUrl, String rpcAddress); Review comment: Not sure what you mean by introducing `ActiveResourceManagerDriverFactory`, could you give more details about it? IIUC, in that case, we need to let `KubernetesResourceManagerFactory`, `YarnResourceManagerFactory`, and probably `MesosResourceManagerFactory` to override the `createResourceManager` method, which will introduce lots of duplicate logic. BTW, in Mesos implementation, we need `webInterfaceUrl` as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r488391778 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java ## @@ -141,6 +143,24 @@ public MemorySize getManagedMemorySize() { return getFlinkMemory().getManaged(); } + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj instanceof TaskExecutorProcessSpec) { Review comment: I think your concern is probably valid. We could add a class-equal check to `CommonProcessMemorySpec#equals`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r488366730 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java ## @@ -508,52 +371,103 @@ private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainer return matchingContainerRequests; } - @Override - public void onShutdownRequest() { - onFatalError(new ResourceManagerException(ERROR_MASSAGE_ON_SHUTDOWN_REQUEST)); - } + private ContainerLaunchContext createTaskExecutorLaunchContext( + ResourceID containerId, + String host, + TaskExecutorProcessSpec taskExecutorProcessSpec) throws Exception { - @Override - public void onNodesUpdated(List list) { - // We are not interested in node updates - } + // init the ContainerLaunchContext + final String currDir = configuration.getCurrentDir(); - @Override - public void onError(Throwable error) { - onFatalError(error); - } + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec); - // - // NMClientAsync CallbackHandler methods - // - @Override - public void onContainerStarted(ContainerId containerId, Map map) { - log.debug("Succeeded to call YARN Node Manager to start container {}.", containerId); - } + log.info("TaskExecutor {} will be started on {} with {}.", + containerId.getStringWithMetadata(), + host, + taskExecutorProcessSpec); - @Override - public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) { - // We are not interested in getting container status + final Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig); + taskManagerConfig.set(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID, containerId.getResourceIdString()); + taskManagerConfig.set(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA, containerId.getMetadata()); + + final String taskManagerDynamicProperties = + BootstrapTools.getDynamicPropertiesAsString(flinkClientConfig, taskManagerConfig); + + log.debug("TaskManager configuration: {}", taskManagerConfig); + + final ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext( + flinkConfig, + yarnConfig, + configuration, + taskManagerParameters, + taskManagerDynamicProperties, + currDir, + YarnTaskExecutorRunner.class, + log); + + taskExecutorLaunchContext.getEnvironment() + .put(ENV_FLINK_NODE_ID, host); + return taskExecutorLaunchContext; } - @Override - public void onContainerStopped(ContainerId containerId) { - log.debug("Succeeded to call YARN Node Manager to stop container {}.", containerId); + @VisibleForTesting + Optional getContainerResource(TaskExecutorProcessSpec taskExecutorProcessSpec) { + return taskExecutorProcessSpecContainerResourceAdapter.tryComputeContainerResource(taskExecutorProcessSpec); } - @Override - public void onStartContainerError(ContainerId containerId, Throwable t) { - runAsync(() -> releaseFailedContainerAndRequestNewContainerIfRequired(containerId, t)); + private RegisterApplicationMasterResponse registerApplicationMaster() throws Exception { + final int restPort; + final String webInterfaceUrl = configuration.getWebInterfaceUrl(); + final String rpcAddress = configuration.getRpcAddress(); + + if (webInterfaceUrl != null) { + final int lastColon = webInterfaceUrl.lastIndexOf(':'); Review comment: I'm not quite familiar with that logic. I could find the `webInterfaceUrl` is originally derived in `RestServerEndpoint`. So it should always have a port. @tillrohrmann Could you help the ensure this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r488363795 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java ## @@ -435,53 +308,43 @@ private void onContainersOfResourceAllocated(Resource resource, List numAccepted, numExcess, numPending, resource); } - @VisibleForTesting - static ResourceID getContainerResourceId(Container container) { - return new ResourceID(container.getId().toString(), container.getNodeId().toString()); + private int getNumRequestedNotAllocatedWorkers() { + return requestResourceFutures.values().stream().mapToInt(Queue::size).sum(); + } + + private int getNumRequestedNotAllocatedWorkersFor(TaskExecutorProcessSpec taskExecutorProcessSpec) { + return requestResourceFutures.getOrDefault(taskExecutorProcessSpec, new LinkedList<>()).size(); + } + + private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest) { + log.info("Removing container request {}.", pendingContainerRequest); + resourceManagerClient.removeContainerRequest(pendingContainerRequest); + } + + private void returnExcessContainer(Container excessContainer) { + log.info("Returning excess container {}.", excessContainer.getId()); + resourceManagerClient.releaseAssignedContainer(excessContainer.getId()); } - private void startTaskExecutorInContainer(Container container, WorkerResourceSpec workerResourceSpec, ResourceID resourceId) { - workerNodeMap.put(resourceId, new YarnWorkerNode(container, resourceId)); + private void startTaskExecutorInContainer(Container container, TaskExecutorProcessSpec taskExecutorProcessSpec, ResourceID resourceId, CompletableFuture requestResourceFuture) { + final YarnWorkerNode yarnWorkerNode = new YarnWorkerNode(container, resourceId); try { // Context information used to start a TaskExecutor Java process ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext( resourceId, container.getNodeId().getHost(), - TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec)); + taskExecutorProcessSpec); nodeManagerClient.startContainerAsync(container, taskExecutorLaunchContext); + requestResourceFuture.complete(yarnWorkerNode); } catch (Throwable t) { - releaseFailedContainerAndRequestNewContainerIfRequired(container.getId(), t); + requestResourceFuture.completeExceptionally(t); } } - private void releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId containerId, Throwable throwable) { - validateRunsInMainThread(); - - log.error("Could not start TaskManager in container {}.", containerId, throwable); - - final ResourceID resourceId = new ResourceID(containerId.toString()); - // release the failed container - workerNodeMap.remove(resourceId); - resourceManagerClient.releaseAssignedContainer(containerId); - notifyAllocatedWorkerStopped(resourceId); - // and ask for a new one - requestYarnContainerIfRequired(); - } - - private void returnExcessContainer(Container excessContainer) { - log.info("Returning excess container {}.", excessContainer.getId()); - resourceManagerClient.releaseAssignedContainer(excessContainer.getId()); - } - - private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest, WorkerResourceSpec workerResourceSpec) { - log.info("Removing container request {}.", pendingContainerRequest); - resourceManagerClient.removeContainerRequest(pendingContainerRequest); - } - private Collection getPendingRequestsAndCheckConsistency(Resource resource, int expectedNum) { - final Collection equivalentResources = workerSpecContainerResourceAdapter.getEquivalentContainerResource(resource, matchingStrategy); + final Collection equivalentResources = taskExecutorProcessSpecContainerResourceAdapter.getEquivalentContainerResource(resource, matchingStrategy); final List> matchingRequests = Review comment: I think we could get a bit readability here. Thanks for the remark. This is an automated message from the Apache Git Service. To
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r488362483 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java ## @@ -435,53 +308,43 @@ private void onContainersOfResourceAllocated(Resource resource, List numAccepted, numExcess, numPending, resource); } - @VisibleForTesting - static ResourceID getContainerResourceId(Container container) { - return new ResourceID(container.getId().toString(), container.getNodeId().toString()); + private int getNumRequestedNotAllocatedWorkers() { + return requestResourceFutures.values().stream().mapToInt(Queue::size).sum(); + } + + private int getNumRequestedNotAllocatedWorkersFor(TaskExecutorProcessSpec taskExecutorProcessSpec) { + return requestResourceFutures.getOrDefault(taskExecutorProcessSpec, new LinkedList<>()).size(); Review comment: We indeed need an empty queue here, `Collections.emptyList()` is not fit this argument. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r488361230 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java ## @@ -72,354 +62,237 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; /** - * The yarn implementation of the resource manager. Used when the system is started - * via the resource framework YARN. + * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment. */ -public class YarnResourceManager extends LegacyActiveResourceManager - implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler { +public class YarnResourceManagerDriver extends AbstractResourceManagerDriver { private static final Priority RM_REQUEST_PRIORITY = Priority.newInstance(1); - /** YARN container map. */ - private final ConcurrentMap workerNodeMap; - /** Environment variable name of the hostname given by the YARN. * In task executor we use the hostnames given by YARN consistently throughout akka */ static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID"; static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received shutdown request from YARN ResourceManager."; - /** Default heartbeat interval between this resource manager and the YARN ResourceManager. */ - private final int yarnHeartbeatIntervalMillis; - private final YarnConfiguration yarnConfig; - @Nullable - private final String webInterfaceUrl; + /** The process environment variables. */ + private final YarnResourceManagerDriverConfiguration configuration; - /** The heartbeat interval while the resource master is waiting for containers. */ - private final int containerRequestHeartbeatIntervalMillis; + /** Default heartbeat interval between this resource manager and the YARN ResourceManager. */ + private final int yarnHeartbeatIntervalMillis; /** Client to communicate with the Resource Manager (YARN's master). */ private AMRMClientAsync resourceManagerClient; + /** The heartbeat interval while the resource master is waiting for containers. */ + private final int containerRequestHeartbeatIntervalMillis; + /** Client to communicate with the Node manager and launch TaskExecutor processes. */ private NMClientAsync nodeManagerClient; - private final WorkerSpecContainerResourceAdapter workerSpecContainerResourceAdapter; + /** Request resource futures, keyed by container ids. */ + private final Map>> requestResourceFutures; + + private final TaskExecutorProcessSpecContainerResourceAdapter taskExecutorProcessSpecContainerResourceAdapter; private final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector; - private WorkerSpecContainerResourceAdapter.MatchingStrategy matchingStrategy; - - public YarnResourceManager( - RpcService rpcService, - ResourceID resourceId, - Configuration flinkConfig, - Map env, - HighAvailabilityServices highAvailabilityServices, - HeartbeatServices heartbeatServices, - SlotManager slotManager, - ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, - JobLeaderIdService jobLeaderIdService, - ClusterInformation clusterInformation, - FatalErrorHandler fatalErrorHandler, - @Nullable String webInterfaceUrl, - ResourceManagerMetricGroup resourceManagerMetricGroup) { - super( - flinkConfig, - env, - rpcService, - resourceId, - highAvailabilityServices, - heartbeatServices, - slotManager, - clusterPartitionTrackerFactory, - jobLeaderIdService, - clusterInformation, - fatalErrorHandler, - resourceManagerMetricGroup); + private TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy matchingStrategy; + + private final YarnResourceManagerClientFactory yarnResourceManagerClientFactory; + +
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r488360039 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java ## @@ -72,354 +62,237 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; /** - * The yarn implementation of the resource manager. Used when the system is started - * via the resource framework YARN. + * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment. */ -public class YarnResourceManager extends LegacyActiveResourceManager - implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler { +public class YarnResourceManagerDriver extends AbstractResourceManagerDriver { private static final Priority RM_REQUEST_PRIORITY = Priority.newInstance(1); - /** YARN container map. */ - private final ConcurrentMap workerNodeMap; - /** Environment variable name of the hostname given by the YARN. * In task executor we use the hostnames given by YARN consistently throughout akka */ static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID"; static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received shutdown request from YARN ResourceManager."; - /** Default heartbeat interval between this resource manager and the YARN ResourceManager. */ - private final int yarnHeartbeatIntervalMillis; - private final YarnConfiguration yarnConfig; - @Nullable - private final String webInterfaceUrl; + /** The process environment variables. */ + private final YarnResourceManagerDriverConfiguration configuration; - /** The heartbeat interval while the resource master is waiting for containers. */ - private final int containerRequestHeartbeatIntervalMillis; + /** Default heartbeat interval between this resource manager and the YARN ResourceManager. */ + private final int yarnHeartbeatIntervalMillis; /** Client to communicate with the Resource Manager (YARN's master). */ private AMRMClientAsync resourceManagerClient; + /** The heartbeat interval while the resource master is waiting for containers. */ + private final int containerRequestHeartbeatIntervalMillis; + /** Client to communicate with the Node manager and launch TaskExecutor processes. */ private NMClientAsync nodeManagerClient; - private final WorkerSpecContainerResourceAdapter workerSpecContainerResourceAdapter; + /** Request resource futures, keyed by container ids. */ + private final Map>> requestResourceFutures; + + private final TaskExecutorProcessSpecContainerResourceAdapter taskExecutorProcessSpecContainerResourceAdapter; private final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector; - private WorkerSpecContainerResourceAdapter.MatchingStrategy matchingStrategy; - - public YarnResourceManager( - RpcService rpcService, - ResourceID resourceId, - Configuration flinkConfig, - Map env, - HighAvailabilityServices highAvailabilityServices, - HeartbeatServices heartbeatServices, - SlotManager slotManager, - ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, - JobLeaderIdService jobLeaderIdService, - ClusterInformation clusterInformation, - FatalErrorHandler fatalErrorHandler, - @Nullable String webInterfaceUrl, - ResourceManagerMetricGroup resourceManagerMetricGroup) { - super( - flinkConfig, - env, - rpcService, - resourceId, - highAvailabilityServices, - heartbeatServices, - slotManager, - clusterPartitionTrackerFactory, - jobLeaderIdService, - clusterInformation, - fatalErrorHandler, - resourceManagerMetricGroup); + private TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy matchingStrategy; + + private final YarnResourceManagerClientFactory yarnResourceManagerClientFactory; + +
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r488345943 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java ## @@ -51,30 +65,74 @@ public void stopContainerAsync(ContainerId containerId, NodeId nodeId) { this.stopContainerAsyncConsumer.accept(containerId, nodeId, callbackHandler); } - void setStartContainerAsyncConsumer(TriConsumer startContainerAsyncConsumer) { - this.startContainerAsyncConsumer = Preconditions.checkNotNull(startContainerAsyncConsumer); - } - - void setStopContainerAsyncConsumer(TriConsumer stopContainerAsyncConsumer) { - this.stopContainerAsyncConsumer = Preconditions.checkNotNull(stopContainerAsyncConsumer); + static Builder builder() { + return new Builder(); } // // Override lifecycle methods to avoid actually starting the service // @Override - protected void serviceInit(Configuration conf) throws Exception { - // noop + public void init(Configuration conf) { + clientInitRunnable.run(); } @Override - protected void serviceStart() throws Exception { - // noop + public void start() { + clientStartRunnable.run(); } @Override - protected void serviceStop() throws Exception { - // noop + public void stop() { + clientStopRunnable.run(); + } + + /** +* Builder class for {@link TestingYarnAMRMClientAsync}. +*/ + public static class Builder { + private volatile TriConsumer startContainerAsyncConsumer = (ignored1, ignored2, ignored3) -> {}; + private volatile TriConsumer stopContainerAsyncConsumer = (ignored1, ignored2, ignored3) -> {}; + private volatile Runnable clientInitRunnable = () -> {}; + private volatile Runnable clientStartRunnable = () -> {}; + private volatile Runnable clientStopRunnable = () -> {}; Review comment: Ditto. ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java ## @@ -45,25 +45,40 @@ */ public class TestingYarnAMRMClientAsync extends AMRMClientAsyncImpl { - private volatile Function, List>> - getMatchingRequestsFunction = ignored -> Collections.emptyList(); - private volatile BiConsumer addContainerRequestConsumer = (ignored1, ignored2) -> {}; - private volatile BiConsumer removeContainerRequestConsumer = (ignored1, ignored2) -> {}; - private volatile BiConsumer releaseAssignedContainerConsumer = (ignored1, ignored2) -> {}; - private volatile Consumer setHeartbeatIntervalConsumer = (ignored) -> {}; - private volatile TriFunction registerApplicationMasterFunction = - (ignored1, ignored2, ignored3) -> RegisterApplicationMasterResponse.newInstance( - Resource.newInstance(0, 0), - Resource.newInstance(Integer.MAX_VALUE, Integer.MAX_VALUE), - Collections.emptyMap(), - null, - Collections.emptyList(), - null, - Collections.emptyList()); - private volatile TriConsumer unregisterApplicationMasterConsumer = (ignored1, ignored2, ignored3) -> {}; - - TestingYarnAMRMClientAsync(CallbackHandler callbackHandler) { + private volatile Function, List>> getMatchingRequestsFunction; + private volatile BiConsumer addContainerRequestConsumer; + private volatile BiConsumer removeContainerRequestConsumer; + private volatile BiConsumer releaseAssignedContainerConsumer; + private volatile Consumer setHeartbeatIntervalConsumer; + private volatile TriFunction registerApplicationMasterFunction; + private volatile TriConsumer unregisterApplicationMasterConsumer; + private volatile Runnable clientInitRunnable; + private volatile Runnable clientStartRunnable; + private volatile Runnable clientStopRunnable; Review comment: Ditto. ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnAMRMClientAsync.java ## @@ -101,58 +116,111 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String unregisterApplicationMasterConsumer.accept(appStatus, appMessage, appTrackingUrl); } - void setGetMatchingRequestsFunction( - Function, List>> - getMatchingRequestsFunction) { - this.getMatchingRequestsFunction =
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r488345856 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/TestingYarnNMClientAsync.java ## @@ -34,11 +34,25 @@ */ class TestingYarnNMClientAsync extends NMClientAsyncImpl { - private volatile TriConsumer startContainerAsyncConsumer = (ignored1, ignored2, ignored3) -> {}; - private volatile TriConsumer stopContainerAsyncConsumer = (ignored1, ignored2, ignored3) -> {}; + private volatile TriConsumer startContainerAsyncConsumer; + private volatile TriConsumer stopContainerAsyncConsumer; + private volatile Runnable clientInitRunnable; + private volatile Runnable clientStartRunnable; + private volatile Runnable clientStopRunnable; Review comment: Before we introduce the `Builder` class, these functions could be set by multiple threads. You're right, we do not need the `volatile` now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] KarmaGYZ commented on a change in pull request #13311: [FLINK-18721] Migrate YarnResourceManager to the new YarnResourceManagerDriver
KarmaGYZ commented on a change in pull request #13311: URL: https://github.com/apache/flink/pull/13311#discussion_r484792254 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -135,7 +135,7 @@ public YarnResourceManager( JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, - @Nullable String webInterfaceUrl, + YarnResourceManagerConfiguration yarnResourceManagerConfiguration, Review comment: I think the `ApplicationConstants.Environment.PWD` should not belong to the `YarnResourceManagerConfiguration `. We may replace it with "System.getEnv()" here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org