[
https://issues.apache.org/jira/browse/YARN-1197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13846389#comment-13846389
]
Wangda Tan commented on YARN-1197:
----------------------------------
Copy text from scheduler design doc to here for easier discussion, please feel
free to let me know your comments!
*Basic Requirements*
We need support handling resource increase request from AM and resource
decrease notify from NM
* Such resource changes should reflect to FiCaSchedulerNode/ App, LeafQueue,
ParentQueue (like usedResource, reservedResource, etc.)
* If user requested an increase request and not be satisfied immediately, it
will be reserved in node/app (The node/app means FiCaSchedulerApp/Node, same in
below) like before.
*Advanced Requirements*
* We need gracefully handle racing conditions,
** Only acquired/running containers can be increased
** Container decreasing will only take effect in acquired/running containers.
(If a container is finished/killed, etc. All of its resource will be released,
we don’t need decrease it)
** User may request a new increase requests on a container, and a pending
increase request for the same container existed. We need replace the pending
with the new one.
** When a requested container resource is less or equal to existing container
resource.
* This will be ignored if no pending increase request for this container
* This will be ignored and the pending increase request will be canceled
** When a pending increase request existed, and a decrease container notify on
the same container comes, this container will be decreased and the pending
increase request will be canceled
*Requirements not clear*
* Do we need a time-out parameter for reserved resource increase request to
avoid it occupy the node resource too long? (Do we have such parameter for
reserve a “normal” container in CS?)
* How to decide which of increase request and normal container request will be
satisfied first? (Currently, I simply make CS satisfy increase request first).
Should it be a configurable parameter?
*Current Implementation*
*1) Decrease Container*
I start with decrease container because it’s more easier to understand,
Decreased container will be handled in nodeUpdate() of Capacity scheduler.
When CS received decreased containers from NM, it will process them one by one
by following steps
* Check if it’s in running state (Because this is reported by NM, it’s state
will either be running or completed), skip if no.
* Remove increase request on the same container-id if it exists
* Decrease/Update container resource in
FiCaSchedulerApp/AppSchedulingInfo/FiCaSchedulerNode/LeafQueue/ParentQueue/other-related-metrics
* Update resource in Container.
* Return decreased container to AM by calling setDecreasedContainer in
AllocateResponse
*2) Increase Container*
Increasing container will be much more complex than decreasing,
*Steps to add container increase request, (pseudo code)*
In CapacityScheduler.allocate(...)
{code}
foreach (increase_request):
if (state != ACQUIRED) and (state != RUNNING):
continue;
// Remove the old request on the same container-id if it exists
if increase_request_exist(increase_request.getContainerId()):
remove(increaseRequest);
// Ask target resource should larger than existing resource
if increase_request.ask_resource <=
existing_resource(increase_request.getContainerId()):
continue;
// Add it to application
getApplication(increase_request.getContainerId()).add(increase_request)
{code}
*Steps to handle container increase request,*
2.1) In CapacityScheduler.nodeUpdate(...):
{code}
if node.is_reserved():
if reserved-increase-request:
LeafQueue.assignReservedIncreaseRequest(...)
elif reserved-normal-container:
...
else:
ParentQueue.assignContainers(...)
// this will finally call
// LeafQueue.assignContainers(...)
{code}
2.2) In CapacityScheduler.nodeUpdate(...):
{code}
if request-is-fit-in-resource:
allocate-resource
update container token
add to AllocateResponse
return allocated-resource
else:
return None
{code}
2.3) In LeafQueue.assignContainers(...):
{code}
foreach (application):
// do increase allocation first
foreach (increase_request):
// check if we can allocate it
// in queue/user limites, etc.
// return None if not satisfied
if request-is-fit-in-resource:
allocate-resource
update container token
add to AllocateResponse
else:
reserve in app/node
return reserved-resource
// do normal allocation
...
{code}
*API changes in CapacityScheduler*
1) YarnScheduler
{code}
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals,
+ List<ContainerResourceIncreaseRequest> increaseRequests)
{code}
2) CSQueue
{code}
+ public void cancelIncreaseRequestReservation(Resource clusterResource,
+ ContainerResourceIncreaseRequest changeRequest, Resource required);
+
+ public void decreaseContainerResource(FiCaSchedulerApp application,
+ Resource clusterResource, Resource released);
{code}
3) FiCaSchedulerApp
{code}
+ synchronized public List<ContainerResourceIncreaseRequest>
+ getResourceIncreaseRequest(NodeId nodeId)
+
+ synchronized public ContainerResourceIncreaseRequest
+ getResourceIncreaseRequest(NodeId nodeId, ContainerId containerId);
+
+ synchronized public void removeIncreaseRequest(NodeId nodeId,
+ ContainerId containerId);
+
+ synchronized public void decreaseContainerResource(Resource released);
{code}
4) FiCaSchedulerNode
{code}
+ public synchronized void increaseResource(Resource resource);
+ public synchronized void decreaseContainerResource(Resource resource);
+ public synchronized void reserveIncreaseResource(
+ ContainerResourceIncreaseRequest increaseRequest);
+ public synchronized void unreserveIncreaseResource(ContainerId containerId)
{code}
5) AppSchedulingInfo
{code}
+ synchronized public void addIncreaseRequests(NodeId nodeId,
+ ContainerResourceIncreaseRequest increaseRequest, Resource required)
+ synchronized public void decreaseContainerResource(Resource released)
+ synchronized public List<ContainerResourceIncreaseRequest>
+ getResourceIncreaseRequests(NodeId nodeId)
+ synchronized public ContainerResourceIncreaseRequest
+ getResourceIncreaseRequests(NodeId nodeId, ContainerId containerId)
+ synchronized public void allocateIncreaseRequest(Resource required)
+ synchronized public void removeIncreaseRequest(NodeId nodeId,
+ ContainerId containerId)
{code}
> Support changing resources of an allocated container
> ----------------------------------------------------
>
> Key: YARN-1197
> URL: https://issues.apache.org/jira/browse/YARN-1197
> Project: Hadoop YARN
> Issue Type: Task
> Components: api, nodemanager, resourcemanager
> Affects Versions: 2.1.0-beta
> Reporter: Wangda Tan
> Assignee: Wangda Tan
> Attachments: mapreduce-project.patch.ver.1,
> tools-project.patch.ver.1, yarn-1197-scheduler-v1.pdf, yarn-1197-v2.pdf,
> yarn-1197-v3.pdf, yarn-1197-v4.pdf, yarn-1197-v5.pdf, yarn-1197.pdf,
> yarn-api-protocol.patch.ver.1, yarn-pb-impl.patch.ver.1,
> yarn-server-common.patch.ver.1, yarn-server-nodemanager.patch.ver.1,
> yarn-server-resourcemanager.patch.ver.1
>
>
> The current YARN resource management logic assumes resource allocated to a
> container is fixed during the lifetime of it. When users want to change a
> resource
> of an allocated container the only way is releasing it and allocating a new
> container with expected size.
> Allowing run-time changing resources of an allocated container will give us
> better control of resource usage in application side
--
This message was sent by Atlassian JIRA
(v6.1.4#6159)