[
https://issues.apache.org/jira/browse/YARN-11781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17935749#comment-17935749
]
ASF GitHub Bot commented on YARN-11781:
---------------------------------------
zeekling commented on code in PR #7448:
URL: https://github.com/apache/hadoop/pull/7448#discussion_r1996872507
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/RequestsHandler.java:
##########
@@ -0,0 +1,558 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.script.Bindings;
+import javax.script.Compilable;
+import javax.script.CompiledScript;
+import javax.script.ScriptEngine;
+import javax.script.ScriptEngineManager;
+import javax.script.ScriptException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceSizing;
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import
org.apache.hadoop.yarn.util.constraint.PlacementConstraintParseException;
+import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
+
+/**
+ * RequestHandler is used to handle requests from applications,
+ * It handles requests at the beginning of CapacityScheduler#allocate,
+ * and manages multiple update items which define which requests
+ * should be chosen and how to update them. based on the capacity-scheduler
+ * configuration and can be updated dynamically without restarting the RM.
+ */
+public class RequestsHandler {
+
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(RequestsHandler.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private static final ScriptEngine SCRIPT_ENGINE =
+ new ScriptEngineManager().getEngineByName("JavaScript");
+
+ private final RMContext rmContext;
+ private final ReentrantReadWriteLock.WriteLock writeLock;
+ private final ReentrantReadWriteLock.ReadLock readLock;
+
+ private boolean enabled = false;
+
+ private List<UpdateItem> updateItems;
+
+ // current updates conf value for comparing
+ private String updatesConfV;
+
+ public RequestsHandler(RMContext rmContext) {
+ this.rmContext = rmContext;
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ writeLock = lock.writeLock();
+ readLock = lock.readLock();
+ }
+
+ public void initialize(Configuration conf)
+ throws IOException, YarnException {
+ if (SCRIPT_ENGINE == null) {
+ // disabled if script engine is not found
+ LOG.warn("Disabled RequestsHandler since script engine not found");
+ return;
+ }
+ boolean newEnabled =
+ conf.getBoolean(CapacitySchedulerConfiguration.REQUEST_HANDLER_ENABLED,
+ CapacitySchedulerConfiguration.DEFAULT_REQUEST_HANDLER_ENABLED);
+ List<UpdateItem> newUpdateItems = null;
+ String newUpdatesConfV = null;
+ if (newEnabled) {
+ newUpdatesConfV =
+ conf.get(CapacitySchedulerConfiguration.REQUEST_HANDLER_UPDATES);
+ UpdatesConf newUpdatesConf = null;
+ if (newUpdatesConfV != null && !newUpdatesConfV.isEmpty()) {
+ newUpdatesConf =
+ OBJECT_MAPPER.readValue(newUpdatesConfV, UpdatesConf.class);
+ }
+ if (newUpdatesConf != null && newUpdatesConf.getItems() != null &&
+ !newUpdatesConf.getItems().isEmpty()) {
+ newUpdateItems = new ArrayList<>();
+ for (UpdateItemConf updateItemConf : newUpdatesConf.getItems()) {
+ newUpdateItems.add(new UpdateItem(updateItemConf));
+ }
+ }
+ }
+ // update
+ writeLock.lock();
+ try{
+ if (enabled == newEnabled &&
+ StringUtils.equals(newUpdatesConfV, updatesConfV)) {
+ LOG.info("No changes detected in RequestsHandler configuration," +
+ " enabled={}, updatesConf={}", enabled, updatesConfV);
+ return;
+ }
+ enabled = newEnabled;
+ updateItems = newUpdateItems;
+ updatesConfV = newUpdatesConfV;
+ LOG.info("Initialized request updater, enabled={}, updatesConf={}",
+ enabled, updatesConfV);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public RequestsHandleResponse handle(FiCaSchedulerApp app,
+ List<ResourceRequest> resourceRequests,
+ List<SchedulingRequest> schedulingRequests) {
+ readLock.lock();
Review Comment:
This is an update operator, why use readLock?
> Implement dynamic requests handling in CapacityScheduler
> --------------------------------------------------------
>
> Key: YARN-11781
> URL: https://issues.apache.org/jira/browse/YARN-11781
> Project: Hadoop YARN
> Issue Type: New Feature
> Components: capacityscheduler
> Reporter: Tao Yang
> Assignee: Tao Yang
> Priority: Major
> Labels: pull-request-available
>
> This feature introduces a dynamic request handling mechanism in the
> CapacityScheduler, allowing for the modification of resource and scheduling
> requests based on configurable rules. This enables more flexible and adaptive
> resource management without requiring a restart of the ResourceManager.
> Benefits:
> * Increased Flexibility: Administrators can define custom rules for
> modifying requests, allowing for more tailored resource allocation strategies.
> * Enhanced Scheduling: The ability to convert ResourceRequest to
> SchedulingRequest opens up new possibilities for advanced scheduling and
> placement constraints.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]