This is an automated email from the ASF dual-hosted git repository.

devaraj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a4cd75e  YARN-9269. Minor cleanup in FpgaResourceAllocator. 
Contributed by Peter Bacsko.
a4cd75e is described below

commit a4cd75e09c934699ec5e2fa969f1c8d0a14c1d49
Author: Devaraj K <deva...@apache.org>
AuthorDate: Wed Mar 27 10:08:07 2019 -0700

    YARN-9269. Minor cleanup in FpgaResourceAllocator. Contributed by Peter 
Bacsko.
---
 .../resources/fpga/FpgaResourceAllocator.java      | 104 ++++++++++-----------
 .../resources/fpga/FpgaResourceHandlerImpl.java    |   2 +-
 2 files changed, 53 insertions(+), 53 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java
index b64ffd0..01036db 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceAllocator.java
@@ -33,11 +33,15 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resource
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.*;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
 
-
 /**
  * This FPGA resource allocator tends to be used by different FPGA vendor's 
plugin
  * A "type" parameter is taken into consideration when allocation
@@ -50,20 +54,21 @@ public class FpgaResourceAllocator {
   private List<FpgaDevice> allowedFpgas = new LinkedList<>();
 
   //key is resource type of FPGA, vendor plugin supported ID
-  private LinkedHashMap<String, List<FpgaDevice>> availableFpga = new 
LinkedHashMap<>();
+  private Map<String, List<FpgaDevice>> availableFpgas = new HashMap<>();
 
-  //key is requestor, aka. container ID
-  private LinkedHashMap<String, List<FpgaDevice>> usedFpgaByRequestor = new 
LinkedHashMap<>();
+  //key is the container ID
+  private Map<String, List<FpgaDevice>> containerToFpgaMapping =
+      new HashMap<>();
 
   private Context nmContext;
 
   @VisibleForTesting
-  public HashMap<String, List<FpgaDevice>> getAvailableFpga() {
-    return availableFpga;
+  Map<String, List<FpgaDevice>> getAvailableFpga() {
+    return availableFpgas;
   }
 
   @VisibleForTesting
-  public List<FpgaDevice> getAllowedFpga() {
+  List<FpgaDevice> getAllowedFpga() {
     return allowedFpgas;
   }
 
@@ -72,25 +77,31 @@ public class FpgaResourceAllocator {
   }
 
   @VisibleForTesting
-  public int getAvailableFpgaCount() {
+  int getAvailableFpgaCount() {
     int count = 0;
-    for (List<FpgaDevice> l : availableFpga.values()) {
-      count += l.size();
-    }
+
+    count = availableFpgas.values()
+      .stream()
+      .mapToInt(i -> i.size())
+      .sum();
+
     return count;
   }
 
   @VisibleForTesting
-  public HashMap<String, List<FpgaDevice>> getUsedFpga() {
-    return usedFpgaByRequestor;
+  Map<String, List<FpgaDevice>> getUsedFpga() {
+    return containerToFpgaMapping;
   }
 
   @VisibleForTesting
-  public int getUsedFpgaCount() {
+  int getUsedFpgaCount() {
     int count = 0;
-    for (List<FpgaDevice> l : usedFpgaByRequestor.values()) {
-      count += l.size();
-    }
+
+    count = containerToFpgaMapping.values()
+        .stream()
+        .mapToInt(i -> i.size())
+        .sum();
+
     return count;
   }
 
@@ -252,42 +263,31 @@ public class FpgaResourceAllocator {
     }
   }
 
-  public synchronized void addFpga(String type, List<FpgaDevice> list) {
-    availableFpga.putIfAbsent(type, new LinkedList<>());
+  // called once during initialization
+  public synchronized void addFpgaDevices(String type, List<FpgaDevice> list) {
+    availableFpgas.putIfAbsent(type, new LinkedList<>());
+    List<FpgaDevice> fpgaDevices = new LinkedList<>();
+
     for (FpgaDevice device : list) {
       if (!allowedFpgas.contains(device)) {
-        allowedFpgas.add(device);
-        availableFpga.get(type).add(device);
+        fpgaDevices.add(device);
+        availableFpgas.get(type).add(device);
+      } else {
+        LOG.warn("Duplicate device found: " + device + ". Ignored");
       }
     }
-    LOG.info("Add a list of FPGA Devices: " + list);
+
+    allowedFpgas = ImmutableList.copyOf(fpgaDevices);
+    LOG.info("Added a list of FPGA Devices: " + allowedFpgas);
   }
 
   public synchronized void updateFpga(String requestor,
       FpgaDevice device, String newIPID, String newHash) {
-    List<FpgaDevice> usedFpgas = usedFpgaByRequestor.get(requestor);
-    int index = findMatchedFpga(usedFpgas, device);
-    if (-1 != index) {
-      usedFpgas.get(index).setIPID(newIPID);
-      FpgaDevice fpga = usedFpgas.get(index);
-      fpga.setIPID(newIPID);
-      fpga.setAocxHash(newHash);
-    } else {
-      LOG.warn("Failed to update FPGA due to unknown reason " +
-          "that no record for this allocated device:" + device);
-    }
+    device.setIPID(newIPID);
+    device.setAocxHash(newHash);
     LOG.info("Update IPID to " + newIPID +
-        " for this allocated device:" + device);
-  }
-
-  private synchronized int findMatchedFpga(List<FpgaDevice> devices, 
FpgaDevice item) {
-    int i = 0;
-    for (; i < devices.size(); i++) {
-      if (devices.get(i) == item) {
-        return i;
-      }
-    }
-    return -1;
+        " for this allocated device: " + device);
+    LOG.info("Update IP hash to " + newHash);
   }
 
   /**
@@ -301,7 +301,8 @@ public class FpgaResourceAllocator {
    * */
   public synchronized FpgaAllocation assignFpga(String type, long count,
       Container container, String ipidHash) throws ResourceHandlerException {
-    List<FpgaDevice> currentAvailableFpga = availableFpga.get(type);
+    List<FpgaDevice> currentAvailableFpga = availableFpgas.get(type);
+
     String requestor = container.getContainerId().toString();
     if (null == currentAvailableFpga) {
       throw new ResourceHandlerException("No such type of FPGA resource 
available: " + type);
@@ -341,8 +342,8 @@ public class FpgaResourceAllocator {
         }
 
         // update state store success, update internal used FPGAs
-        usedFpgaByRequestor.putIfAbsent(requestor, new LinkedList<>());
-        usedFpgaByRequestor.get(requestor).addAll(assignedFpgas);
+        containerToFpgaMapping.putIfAbsent(requestor, new LinkedList<>());
+        containerToFpgaMapping.get(requestor).addAll(assignedFpgas);
       }
 
       return new FpgaAllocation(assignedFpgas, currentAvailableFpga);
@@ -390,14 +391,13 @@ public class FpgaResourceAllocator {
   }
 
   public synchronized void cleanupAssignFpgas(String requestor) {
-    List<FpgaDevice> usedFpgas = usedFpgaByRequestor.get(requestor);
+    List<FpgaDevice> usedFpgas = containerToFpgaMapping.get(requestor);
     if (usedFpgas != null) {
       for (FpgaDevice device : usedFpgas) {
         // Add back to availableFpga
-        availableFpga.get(device.getType()).add(device);
+        availableFpgas.get(device.getType()).add(device);
       }
-      usedFpgaByRequestor.remove(requestor);
+      containerToFpgaMapping.remove(requestor);
     }
   }
-
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java
index 1a9d608..cd1ea13 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/fpga/FpgaResourceHandlerImpl.java
@@ -101,7 +101,7 @@ public class FpgaResourceHandlerImpl implements 
ResourceHandler {
     // Get avialable devices minor numbers from toolchain or static 
configuration
     List<FpgaResourceAllocator.FpgaDevice> fpgaDeviceList =
         FpgaDiscoverer.getInstance().discover();
-    allocator.addFpga(vendorPlugin.getFpgaType(), fpgaDeviceList);
+    allocator.addFpgaDevices(vendorPlugin.getFpgaType(), fpgaDeviceList);
     this.cGroupsHandler.initializeCGroupController(
         CGroupsHandler.CGroupController.DEVICES);
     return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to