[ 
https://issues.apache.org/jira/browse/YARN-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941502#comment-15941502
 ] 

ASF GitHub Bot commented on YARN-5829:
--------------------------------------

Github user szegedim commented on a diff in the pull request:

    https://github.com/apache/hadoop/pull/201#discussion_r108025337
  
    --- Diff: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
 ---
    @@ -0,0 +1,376 @@
    +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
    +
    +import org.apache.hadoop.yarn.api.records.*;
    +import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
    +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
    +import org.apache.hadoop.yarn.util.resource.Resources;
    +import org.junit.Test;
    +import org.mockito.invocation.InvocationOnMock;
    +import org.mockito.stubbing.Answer;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Map;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotEquals;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Test scheduler node, especially preemption reservations.
    + */
    +public class TestFSSchedulerNode {
    +  private long containerNum = 0;
    +  private ArrayList<RMContainer> containers = new ArrayList<>();
    +
    +  private RMNode createNode() {
    +    RMNode node = mock(RMNode.class);
    +    when(node.getTotalCapability()).thenReturn(Resource.newInstance(8192, 
8));
    +    when(node.getHostName()).thenReturn("host.domain.com");
    +    return node;
    +  }
    +
    +  private RMContainer createDefaultContainer() {
    +    return createContainer(Resource.newInstance(1024, 1), null);
    +  }
    +
    +  private RMContainer createContainer(
    +      Resource request, ApplicationAttemptId appAttemptId) {
    +    RMContainer container = mock(RMContainer.class);
    +    Container containerInner = mock(Container.class);
    +    ContainerId id = mock(ContainerId.class);
    +    when(id.getContainerId()).thenReturn(containerNum);
    +    when(containerInner.getResource()).
    +        thenReturn(Resources.clone(request));
    +    when(containerInner.getId()).thenReturn(id);
    +    when(containerInner.getExecutionType()).
    +        thenReturn(ExecutionType.GUARANTEED);
    +    when(container.getApplicationAttemptId()).thenReturn(appAttemptId);
    +    when(container.getContainerId()).thenReturn(id);
    +    when(container.getContainer()).thenReturn(containerInner);
    +    
when(container.getExecutionType()).thenReturn(ExecutionType.GUARANTEED);
    +    when(container.getAllocatedResource()).
    +        thenReturn(Resources.clone(request));
    +    containers.add(container);
    +    containerNum++;
    +    return container;
    +  }
    +
    +  private void saturateCluster(FSSchedulerNode schedulerNode) {
    +    while (!Resources.isNone(schedulerNode.getUnallocatedResource())) {
    +      createDefaultContainer();
    +      schedulerNode.allocateContainer(containers.get((int)containerNum - 
1));
    +      schedulerNode.containerStarted(containers.get((int)containerNum - 1).
    +          getContainerId());
    +    }
    +  }
    +
    +  private FSAppAttempt createStarvingApp(FSSchedulerNode schedulerNode,
    +                                         Resource request) {
    +    FSAppAttempt starvingApp = mock(FSAppAttempt.class);
    +    ApplicationAttemptId appAttemptId =
    +        mock(ApplicationAttemptId.class);
    +    when(starvingApp.getApplicationAttemptId()).thenReturn(appAttemptId);
    +    when(starvingApp.assignContainer(schedulerNode)).thenAnswer(
    +        new Answer<Resource>() {
    +          @Override
    +          public Resource answer(InvocationOnMock invocationOnMock)
    +              throws Throwable {
    +            Resource response = Resource.newInstance(0, 0);
    +            while (!Resources.isNone(request) &&
    +                !Resources.isNone(schedulerNode.getUnallocatedResource())) 
{
    +              RMContainer container = createContainer(request, 
appAttemptId);
    +              schedulerNode.allocateContainer(container);
    +              Resources.addTo(response, container.getAllocatedResource());
    +              Resources.subtractFrom(request,
    +                  container.getAllocatedResource());
    +            }
    +            return response;
    +          }
    +        });
    +    when(starvingApp.getPendingDemand()).thenReturn(request);
    +    return starvingApp;
    +  }
    +
    +  private void finalValidation(FSSchedulerNode schedulerNode) {
    +    assertEquals("Everything should have been released",
    +        Resources.none(), schedulerNode.getAllocatedResource());
    +    assertTrue("No containers should be reserved for preemption",
    +        schedulerNode.containersForPreemption.isEmpty());
    +    assertTrue("No resources should be reserved for preemptees",
    +        schedulerNode.resourcesPreemptedForApp.isEmpty());
    +    assertEquals(
    +        "No amount of resource should be reserved for preemptees",
    +        Resources.none(),
    +        schedulerNode.getTotalReserved());
    +  }
    +
    +  private void allocateContainers(FSSchedulerNode schedulerNode) {
    +    FairScheduler.assignPreemptedContainers(schedulerNode);
    +  }
    +
    +  /**
    +   * Allocate and release a single container.
    +   */
    +  @Test
    +  public void testSimpleAllocation() {
    +    RMNode node = createNode();
    +    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
    +
    +    createDefaultContainer();
    +    assertEquals("Nothing should have been allocated, yet",
    +        Resources.none(), schedulerNode.getAllocatedResource());
    +    schedulerNode.allocateContainer(containers.get(0));
    +    assertEquals("Container should be allocated",
    +        containers.get(0).getContainer().getResource(),
    +        schedulerNode.getAllocatedResource());
    +    schedulerNode.releaseContainer(containers.get(0).getContainerId(), 
true);
    +    assertEquals("Everything should have been released",
    +        Resources.none(), schedulerNode.getAllocatedResource());
    +
    +    // Check that we are error prone
    +    schedulerNode.releaseContainer(containers.get(0).getContainerId(), 
true);
    +    finalValidation(schedulerNode);
    +  }
    +
    +  /**
    +   * Allocate and release three containers with launch.
    +   */
    +  @Test
    +  public void testMultipleAllocations() {
    +    RMNode node = createNode();
    +    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
    +
    +    createDefaultContainer();
    +    createDefaultContainer();
    +    createDefaultContainer();
    +    assertEquals("Nothing should have been allocated, yet",
    +        Resources.none(), schedulerNode.getAllocatedResource());
    +    schedulerNode.allocateContainer(containers.get(0));
    +    schedulerNode.containerStarted(containers.get(0).getContainerId());
    +    schedulerNode.allocateContainer(containers.get(1));
    +    schedulerNode.containerStarted(containers.get(1).getContainerId());
    +    schedulerNode.allocateContainer(containers.get(2));
    +    assertEquals("Container should be allocated",
    +        Resources.multiply(containers.get(0).getContainer().getResource(), 
3.0),
    +        schedulerNode.getAllocatedResource());
    +    schedulerNode.releaseContainer(containers.get(1).getContainerId(), 
true);
    +    schedulerNode.releaseContainer(containers.get(2).getContainerId(), 
true);
    +    schedulerNode.releaseContainer(containers.get(0).getContainerId(), 
true);
    +    finalValidation(schedulerNode);
    +  }
    +
    +  /**
    +   * Allocate and release a single container.
    +   */
    +  @Test
    +  public void testSimplePreemption() {
    +    RMNode node = createNode();
    +    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
    +
    +    // Launch containers and saturate the cluster
    +    saturateCluster(schedulerNode);
    +    assertEquals("Container should be allocated",
    +        Resources.multiply(containers.get(0).getContainer().getResource(),
    +            containerNum),
    +        schedulerNode.getAllocatedResource());
    +
    +    // Request preemption
    +    FSAppAttempt starvingApp = createStarvingApp(schedulerNode,
    +        Resource.newInstance(1024, 1));
    +    schedulerNode.addContainersForPreemption(
    +        Collections.singletonList(containers.get(0)), starvingApp);
    +    assertEquals(
    +        "No resource amount should be reserved for preemptees",
    +        containers.get(0).getAllocatedResource(),
    +        schedulerNode.getTotalReserved());
    +
    +    // Preemption occurs
    +    schedulerNode.releaseContainer(containers.get(0).getContainerId(), 
true);
    +    allocateContainers(schedulerNode);
    +    assertEquals("Container should be allocated",
    +        schedulerNode.getTotalResource(),
    +        schedulerNode.getAllocatedResource());
    +
    +    // Release all containers
    +    for (int i = 1; i < containerNum; ++i) {
    +      schedulerNode.releaseContainer(containers.get(i).getContainerId(), 
true);
    +    }
    +    finalValidation(schedulerNode);
    +  }
    +
    +  /**
    +   * Allocate and release three containers requested by two apps.
    +   */
    +  @Test
    +  public void testComplexPreemption() {
    +    RMNode node = createNode();
    +    FSSchedulerNode schedulerNode = new FSSchedulerNode(node, false);
    +
    +    // Launch containers and saturate the cluster
    +    saturateCluster(schedulerNode);
    +    assertEquals("Container should be allocated",
    +        Resources.multiply(containers.get(0).getContainer().getResource(),
    +            containerNum),
    +        schedulerNode.getAllocatedResource());
    +
    +    // Preempt a container
    +    FSAppAttempt starvingApp1 = createStarvingApp(schedulerNode,
    +        Resource.newInstance(2048, 2));
    +    FSAppAttempt starvingApp2 = createStarvingApp(schedulerNode,
    +        Resource.newInstance(1024, 1));
    +
    +    // Preemption thread kicks in
    +    schedulerNode.addContainersForPreemption(
    +        Collections.singletonList(containers.get(0)), starvingApp1);
    +    schedulerNode.addContainersForPreemption(
    +        Collections.singletonList(containers.get(1)), starvingApp1);
    +    schedulerNode.addContainersForPreemption(
    +        Collections.singletonList(containers.get(2)), starvingApp2);
    +
    +    // Preemption happens
    +    schedulerNode.releaseContainer(containers.get(0).getContainerId(), 
true);
    +    schedulerNode.releaseContainer(containers.get(2).getContainerId(), 
true);
    +    schedulerNode.releaseContainer(containers.get(1).getContainerId(), 
true);
    +
    +    allocateContainers(schedulerNode);
    +    assertEquals("Container should be allocated",
    +        schedulerNode.getTotalResource(),
    +        schedulerNode.getAllocatedResource());
    +
    +    // Release all containers
    +    for (int i = 3; i < containerNum; ++i) {
    +      schedulerNode.releaseContainer(containers.get(i).getContainerId(), 
true);
    --- End diff --
    
    Same response :-)


> FS preemption should reserve a node before considering containers on it for 
> preemption
> --------------------------------------------------------------------------------------
>
>                 Key: YARN-5829
>                 URL: https://issues.apache.org/jira/browse/YARN-5829
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: fairscheduler
>            Reporter: Karthik Kambatla
>            Assignee: Miklos Szegedi
>
> FS preemption evaluates nodes for preemption, and subsequently preempts 
> identified containers. If this node is not reserved for a specific 
> application, any other application could be allocated resources on this node. 
> Reserving the node for the starved application before preempting containers 
> would help avoid this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: yarn-issues-h...@hadoop.apache.org

Reply via email to