Repository: hadoop
Updated Branches:
  refs/heads/trunk 1f2794b4f -> f0ac18d00


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
deleted file mode 100644
index 7e24687..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.ResourceOption;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
-import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
-import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-public class TopKNodeSelector implements ClusterMonitor {
-
-  final static Log LOG = LogFactory.getLog(TopKNodeSelector.class);
-
-  public enum TopKComparator implements Comparator<ClusterNode> {
-    WAIT_TIME,
-    QUEUE_LENGTH;
-
-    @Override
-    public int compare(ClusterNode o1, ClusterNode o2) {
-      if (getQuant(o1) == getQuant(o2)) {
-        return o1.timestamp < o2.timestamp ? +1 : -1;
-      }
-      return getQuant(o1) > getQuant(o2) ? +1 : -1;
-    }
-
-    private int getQuant(ClusterNode c) {
-      return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength;
-    }
-  }
-
-  static class ClusterNode {
-    int queueTime = -1;
-    int waitQueueLength = 0;
-    double timestamp;
-    final NodeId nodeId;
-
-    public ClusterNode(NodeId nodeId) {
-      this.nodeId = nodeId;
-      updateTimestamp();
-    }
-
-    public ClusterNode setQueueTime(int queueTime) {
-      this.queueTime = queueTime;
-      return this;
-    }
-
-    public ClusterNode setWaitQueueLength(int queueLength) {
-      this.waitQueueLength = queueLength;
-      return this;
-    }
-
-    public ClusterNode updateTimestamp() {
-      this.timestamp = System.currentTimeMillis();
-      return this;
-    }
-  }
-
-  private final int k;
-  private final List<NodeId> topKNodes;
-  private final ScheduledExecutorService scheduledExecutor;
-  private final HashMap<NodeId, ClusterNode> clusterNodes = new HashMap<>();
-  private final Comparator<ClusterNode> comparator;
-
-  Runnable computeTask = new Runnable() {
-    @Override
-    public void run() {
-      synchronized (topKNodes) {
-        topKNodes.clear();
-        topKNodes.addAll(computeTopKNodes());
-      }
-    }
-  };
-
-  @VisibleForTesting
-  TopKNodeSelector(int k, TopKComparator comparator) {
-    this.k = k;
-    this.topKNodes = new ArrayList<>();
-    this.comparator = comparator;
-    this.scheduledExecutor = null;
-  }
-
-  public TopKNodeSelector(int k, long nodeComputationInterval,
-      TopKComparator comparator) {
-    this.k = k;
-    this.topKNodes = new ArrayList<>();
-    this.scheduledExecutor = Executors.newScheduledThreadPool(1);
-    this.comparator = comparator;
-    this.scheduledExecutor.scheduleAtFixedRate(computeTask,
-        nodeComputationInterval, nodeComputationInterval,
-        TimeUnit.MILLISECONDS);
-  }
-
-
-  @Override
-  public void addNode(List<NMContainerStatus> containerStatuses, RMNode
-      rmNode) {
-    LOG.debug("Node added event from: " + rmNode.getNode().getName());
-    // Ignoring this currently : atleast one NODE_UPDATE heartbeat is
-    // required to ensure node eligibility.
-  }
-
-  @Override
-  public void removeNode(RMNode removedRMNode) {
-    LOG.debug("Node delete event for: " + removedRMNode.getNode().getName());
-    synchronized (this.clusterNodes) {
-      if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) {
-        this.clusterNodes.remove(removedRMNode.getNodeID());
-        LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
-      } else {
-        LOG.debug("Node not in list!");
-      }
-    }
-  }
-
-  @Override
-  public void nodeUpdate(RMNode rmNode) {
-    LOG.debug("Node update event from: " + rmNode.getNodeID());
-    QueuedContainersStatus queuedContainersStatus =
-        rmNode.getQueuedContainersStatus();
-    int estimatedQueueWaitTime =
-        queuedContainersStatus.getEstimatedQueueWaitTime();
-    int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
-    // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node
-    // UNLESS comparator is based on queue length, in which case, we should add
-    synchronized (this.clusterNodes) {
-      ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
-      if (currentNode == null) {
-        if (estimatedQueueWaitTime != -1
-            || comparator == TopKComparator.QUEUE_LENGTH) {
-          this.clusterNodes.put(rmNode.getNodeID(),
-              new ClusterNode(rmNode.getNodeID())
-                  .setQueueTime(estimatedQueueWaitTime)
-                  .setWaitQueueLength(waitQueueLength));
-          LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
-              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
-              "wait queue length [" + waitQueueLength + "]");
-        } else {
-          LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" +
-              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
-              "wait queue length [" + waitQueueLength + "]");
-        }
-      } else {
-        if (estimatedQueueWaitTime != -1
-            || comparator == TopKComparator.QUEUE_LENGTH) {
-          currentNode
-              .setQueueTime(estimatedQueueWaitTime)
-              .setWaitQueueLength(waitQueueLength)
-              .updateTimestamp();
-          LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
-              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
-              "wait queue length [" + waitQueueLength + "]");
-        } else {
-          this.clusterNodes.remove(rmNode.getNodeID());
-          LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
-              "with queue wait time [" + currentNode.queueTime + "] and " +
-              "wait queue length [" + currentNode.waitQueueLength + "]");
-        }
-      }
-    }
-  }
-
-  @Override
-  public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) 
{
-    LOG.debug("Node resource update event from: " + rmNode.getNodeID());
-    // Ignoring this currently...
-  }
-
-  public List<NodeId> selectNodes() {
-    synchronized (this.topKNodes) {
-      return this.k < this.topKNodes.size() ?
-          new ArrayList<>(this.topKNodes).subList(0, this.k) :
-          new ArrayList<>(this.topKNodes);
-    }
-  }
-
-  private List<NodeId> computeTopKNodes() {
-    synchronized (this.clusterNodes) {
-      ArrayList aList = new ArrayList<>(this.clusterNodes.values());
-      List<NodeId> retList = new ArrayList<>();
-      Object[] nodes = aList.toArray();
-      // Collections.sort would do something similar by calling Arrays.sort
-      // internally but would finally iterate through the input list (aList)
-      // to reset the value of each element.. Since we don't really care about
-      // 'aList', we can use the iteration to create the list of nodeIds which
-      // is what we ultimately care about.
-      Arrays.sort(nodes, (Comparator)comparator);
-      for (int j=0; j < nodes.length; j++) {
-        retList.add(((ClusterNode)nodes[j]).nodeId);
-      }
-      return retList;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
new file mode 100644
index 0000000..5f63923
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+/**
+ * Unit tests for NodeQueueLoadMonitor.
+ */
+public class TestNodeQueueLoadMonitor {
+
+  static class FakeNodeId extends NodeId {
+    final String host;
+    final int port;
+
+    public FakeNodeId(String host, int port) {
+      this.host = host;
+      this.port = port;
+    }
+
+    @Override
+    public String getHost() {
+      return host;
+    }
+
+    @Override
+    public int getPort() {
+      return port;
+    }
+
+    @Override
+    protected void setHost(String host) {}
+    @Override
+    protected void setPort(int port) {}
+    @Override
+    protected void build() {}
+
+    @Override
+    public String toString() {
+      return host + ":" + port;
+    }
+  }
+
+  @Test
+  public void testWaitTimeSort() {
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_WAIT_TIME);
+    selector.updateNode(createRMNode("h1", 1, 15, 10));
+    selector.updateNode(createRMNode("h2", 2, 5, 10));
+    selector.updateNode(createRMNode("h3", 3, 10, 10));
+    selector.computeTask.run();
+    List<NodeId> nodeIds = selector.selectNodes();
+    System.out.println("1-> " + nodeIds);
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+    // Now update node3
+    selector.updateNode(createRMNode("h3", 3, 2, 10));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("2-> "+ nodeIds);
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+    // Now send update with -1 wait time
+    selector.updateNode(createRMNode("h4", 4, -1, 10));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("3-> "+ nodeIds);
+    // No change
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+  }
+
+  @Test
+  public void testQueueLengthSort() {
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+    selector.updateNode(createRMNode("h1", 1, -1, 15));
+    selector.updateNode(createRMNode("h2", 2, -1, 5));
+    selector.updateNode(createRMNode("h3", 3, -1, 10));
+    selector.computeTask.run();
+    List<NodeId> nodeIds = selector.selectNodes();
+    System.out.println("1-> " + nodeIds);
+    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
+    Assert.assertEquals("h3:3", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+    // Now update node3
+    selector.updateNode(createRMNode("h3", 3, -1, 2));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("2-> "+ nodeIds);
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+
+    // Now send update with -1 wait time but valid length
+    selector.updateNode(createRMNode("h4", 4, -1, 20));
+    selector.computeTask.run();
+    nodeIds = selector.selectNodes();
+    System.out.println("3-> "+ nodeIds);
+    // No change
+    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
+    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
+    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
+    Assert.assertEquals("h4:4", nodeIds.get(3).toString());
+  }
+
+  @Test
+  public void testContainerQueuingLimit() {
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+    selector.updateNode(createRMNode("h1", 1, -1, 15));
+    selector.updateNode(createRMNode("h2", 2, -1, 5));
+    selector.updateNode(createRMNode("h3", 3, -1, 10));
+
+    // Test Mean Calculation
+    selector.initThresholdCalculator(0, 6, 100);
+    QueueLimitCalculator calculator = selector.getThresholdCalculator();
+    ContainerQueuingLimit containerQueuingLimit = calculator
+        .createContainerQueuingLimit();
+    Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength());
+    Assert.assertEquals(-1, containerQueuingLimit.getMaxQueueWaitTimeInMs());
+    selector.computeTask.run();
+    containerQueuingLimit = calculator.createContainerQueuingLimit();
+    Assert.assertEquals(10, containerQueuingLimit.getMaxQueueLength());
+    Assert.assertEquals(-1, containerQueuingLimit.getMaxQueueWaitTimeInMs());
+
+    // Test Limits do not exceed specified max
+    selector.updateNode(createRMNode("h1", 1, -1, 110));
+    selector.updateNode(createRMNode("h2", 2, -1, 120));
+    selector.updateNode(createRMNode("h3", 3, -1, 130));
+    selector.updateNode(createRMNode("h4", 4, -1, 140));
+    selector.updateNode(createRMNode("h5", 5, -1, 150));
+    selector.updateNode(createRMNode("h6", 6, -1, 160));
+    selector.computeTask.run();
+    containerQueuingLimit = calculator.createContainerQueuingLimit();
+    Assert.assertEquals(100, containerQueuingLimit.getMaxQueueLength());
+
+    // Test Limits do not go below specified min
+    selector.updateNode(createRMNode("h1", 1, -1, 1));
+    selector.updateNode(createRMNode("h2", 2, -1, 2));
+    selector.updateNode(createRMNode("h3", 3, -1, 3));
+    selector.updateNode(createRMNode("h4", 4, -1, 4));
+    selector.updateNode(createRMNode("h5", 5, -1, 5));
+    selector.updateNode(createRMNode("h6", 6, -1, 6));
+    selector.computeTask.run();
+    containerQueuingLimit = calculator.createContainerQueuingLimit();
+    Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength());
+
+  }
+
+  private RMNode createRMNode(String host, int port,
+      int waitTime, int queueLength) {
+    RMNode node1 = Mockito.mock(RMNode.class);
+    NodeId nID1 = new FakeNodeId(host, port);
+    Mockito.when(node1.getNodeID()).thenReturn(nID1);
+    QueuedContainersStatus status1 =
+        Mockito.mock(QueuedContainersStatus.class);
+    Mockito.when(status1.getEstimatedQueueWaitTime())
+        .thenReturn(waitTime);
+    Mockito.when(status1.getWaitQueueLength())
+        .thenReturn(queueLength);
+    Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1);
+    return node1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
deleted file mode 100644
index aec4e86..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
-
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.List;
-
-public class TestTopKNodeSelector {
-
-  static class FakeNodeId extends NodeId {
-    final String host;
-    final int port;
-
-    public FakeNodeId(String host, int port) {
-      this.host = host;
-      this.port = port;
-    }
-
-    @Override
-    public String getHost() {
-      return host;
-    }
-
-    @Override
-    public int getPort() {
-      return port;
-    }
-
-    @Override
-    protected void setHost(String host) {}
-    @Override
-    protected void setPort(int port) {}
-    @Override
-    protected void build() {}
-
-    @Override
-    public String toString() {
-      return host + ":" + port;
-    }
-  }
-
-  @Test
-  public void testQueueTimeSort() {
-    TopKNodeSelector selector = new TopKNodeSelector(5,
-        TopKNodeSelector.TopKComparator.WAIT_TIME);
-    selector.nodeUpdate(createRMNode("h1", 1, 15, 10));
-    selector.nodeUpdate(createRMNode("h2", 2, 5, 10));
-    selector.nodeUpdate(createRMNode("h3", 3, 10, 10));
-    selector.computeTask.run();
-    List<NodeId> nodeIds = selector.selectNodes();
-    System.out.println("1-> " + nodeIds);
-    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
-    Assert.assertEquals("h3:3", nodeIds.get(1).toString());
-    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
-
-    // Now update node3
-    selector.nodeUpdate(createRMNode("h3", 3, 2, 10));
-    selector.computeTask.run();
-    nodeIds = selector.selectNodes();
-    System.out.println("2-> "+ nodeIds);
-    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
-    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
-    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
-
-    // Now send update with -1 wait time
-    selector.nodeUpdate(createRMNode("h4", 4, -1, 10));
-    selector.computeTask.run();
-    nodeIds = selector.selectNodes();
-    System.out.println("3-> "+ nodeIds);
-    // No change
-    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
-    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
-    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
-  }
-
-  @Test
-  public void testQueueLengthSort() {
-    TopKNodeSelector selector = new TopKNodeSelector(5,
-        TopKNodeSelector.TopKComparator.QUEUE_LENGTH);
-    selector.nodeUpdate(createRMNode("h1", 1, -1, 15));
-    selector.nodeUpdate(createRMNode("h2", 2, -1, 5));
-    selector.nodeUpdate(createRMNode("h3", 3, -1, 10));
-    selector.computeTask.run();
-    List<NodeId> nodeIds = selector.selectNodes();
-    System.out.println("1-> " + nodeIds);
-    Assert.assertEquals("h2:2", nodeIds.get(0).toString());
-    Assert.assertEquals("h3:3", nodeIds.get(1).toString());
-    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
-
-    // Now update node3
-    selector.nodeUpdate(createRMNode("h3", 3, -1, 2));
-    selector.computeTask.run();
-    nodeIds = selector.selectNodes();
-    System.out.println("2-> "+ nodeIds);
-    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
-    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
-    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
-
-    // Now send update with -1 wait time but valid length
-    selector.nodeUpdate(createRMNode("h4", 4, -1, 20));
-    selector.computeTask.run();
-    nodeIds = selector.selectNodes();
-    System.out.println("3-> "+ nodeIds);
-    // No change
-    Assert.assertEquals("h3:3", nodeIds.get(0).toString());
-    Assert.assertEquals("h2:2", nodeIds.get(1).toString());
-    Assert.assertEquals("h1:1", nodeIds.get(2).toString());
-    Assert.assertEquals("h4:4", nodeIds.get(3).toString());
-  }
-
-  private RMNode createRMNode(String host, int port,
-      int waitTime, int queueLength) {
-    RMNode node1 = Mockito.mock(RMNode.class);
-    NodeId nID1 = new FakeNodeId(host, port);
-    Mockito.when(node1.getNodeID()).thenReturn(nID1);
-    QueuedContainersStatus status1 =
-        Mockito.mock(QueuedContainersStatus.class);
-    Mockito.when(status1.getEstimatedQueueWaitTime())
-        .thenReturn(waitTime);
-    Mockito.when(status1.getWaitQueueLength())
-        .thenReturn(queueLength);
-    Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1);
-    return node1;
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to