Author: challngr Date: Wed Jan 20 17:42:22 2016 New Revision: 1725763 URL: http://svn.apache.org/viewvc?rev=1725763&view=rev Log: UIMA-4742 Clean-up and adjust rules for purging a node for vary-on and for node death, first delivery.
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java?rev=1725763&r1=1725762&r2=1725763&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java Wed Jan 20 17:42:22 2016 @@ -444,19 +444,19 @@ public class Machine return ret; } - RmQueriedMachine queryOfflineMachine() // UIMA-4234 - { - RmQueriedMachine ret = queryMachine(); - ret.setOffline(); - return ret; - } - - RmQueriedMachine queryUnresponsiveMachine() // UIMA-4234 - { - RmQueriedMachine ret = queryMachine(); - ret.setUnresponsive(); - return ret; - } +// RmQueriedMachine queryOfflineMachine() // UIMA-4234 +// { +// RmQueriedMachine ret = queryMachine(); +// ret.setOffline(); +// return ret; +// } +// +// RmQueriedMachine queryUnresponsiveMachine() // UIMA-4234 +// { +// RmQueriedMachine ret = queryMachine(); +// ret.setUnresponsive(); +// return ret; +// } /** * A machine's investment is the sum of it's share's investments. Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java?rev=1725763&r1=1725762&r2=1725763&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java Wed Jan 20 17:42:22 2016 @@ -542,13 +542,6 @@ class NodePool Machine m = allMachines.get(n); if ( m == null ) { - m = unresponsiveMachines.get(n); - } - if ( m == null ) { - m = offlineMachines.get(n); - } - - if ( m == null ) { for ( NodePool np : children.values() ) { m = np.getMachine(n); if ( m != null ) { @@ -972,36 +965,36 @@ class NodePool return np; } - private synchronized void incrementOnlineByOrder(int order) - { - if ( ! onlineMachinesByOrder.containsKey(order) ) { - onlineMachinesByOrder.put(order, 1); - } else { - onlineMachinesByOrder.put(order, onlineMachinesByOrder.get(order) + 1); - } - } - - private synchronized void decrementOnlineByOrder(int order) - { - onlineMachinesByOrder.put(order, onlineMachinesByOrder.get(order) - 1); - } - - synchronized void getLocalOnlineByOrder(int[] ret) // for queries, just me - { - for ( int o: onlineMachinesByOrder.keySet() ) { - ret[o] += onlineMachinesByOrder.get(o); - } - } - - synchronized void getOnlineByOrder(int[] ret) // for queries - { - for ( int o: onlineMachinesByOrder.keySet() ) { - ret[o] += onlineMachinesByOrder.get(o); - } - for ( NodePool child : children.values() ) { - child.getOnlineByOrder(ret); - } - } +// private synchronized void incrementOnlineByOrder(int order) +// { +// if ( ! onlineMachinesByOrder.containsKey(order) ) { +// onlineMachinesByOrder.put(order, 1); +// } else { +// onlineMachinesByOrder.put(order, onlineMachinesByOrder.get(order) + 1); +// } +// } + +// private synchronized void decrementOnlineByOrder(int order) +// { +// onlineMachinesByOrder.put(order, onlineMachinesByOrder.get(order) - 1); +// } + +// synchronized void getLocalOnlineByOrder(int[] ret) // for queries, just me +// { +// for ( int o: onlineMachinesByOrder.keySet() ) { +// ret[o] += onlineMachinesByOrder.get(o); +// } +// } + +// synchronized void getOnlineByOrder(int[] ret) // for queries +// { +// for ( int o: onlineMachinesByOrder.keySet() ) { +// ret[o] += onlineMachinesByOrder.get(o); +// } +// for ( NodePool child : children.values() ) { +// child.getOnlineByOrder(ret); +// } +// } void signalDb(Machine m, RmNodes key, Object value) @@ -1046,65 +1039,81 @@ class NodePool return props; } + void adjustMachinesByOrder(int neworder, Machine m) + { + int oldorder = m.getShareOrder(); + if ( oldorder != neworder ) { // can change. e.g. if it was taken offline for + HashMap<Node, Machine> mlist = machinesByOrder.get(oldorder); + mlist.remove(m.key()); + m.setShareOrder(neworder); // hardware changes. + + mlist = machinesByOrder.get(neworder); + if ( mlist == null ) { + mlist = new HashMap<Node, Machine>(); + machinesByOrder.put(neworder, mlist); + } + mlist.put(m.key(), m); + } + } + /** * Handle a new node update. */ Machine nodeArrives(Node node, int order) { String methodName = "nodeArrives"; + // Note: the caller of this method MUST (aka IS REQUIRED) to insure this this is the + // right nodepool as we do not recurse. updateMaxOrder(order); - - for ( NodePool np : children.values() ) { - if ( np.containsPoolNode(node) ) { - Machine m = np.nodeArrives(node, order); - return m; - } - } - if ( allMachines.containsKey(node) ) { // already known, do nothing - Machine m = allMachines.get(node); - logger.trace(methodName, null, "Node ", m.getId(), " is already known, not adding."); - return m; - } + String n = node.getNodeIdentity().getName(); - if ( offlineMachines.containsKey(node) ) { // if it's offline it can't be restored like this. + // if it's offline it can't be restored like this. + if ( offlineMachines.containsKey(node) ) { Machine m = offlineMachines.get(node); logger.trace(methodName, null, "Node ", m.getId(), " is offline, not activating."); return m; } + // logger.info(methodName, null, "NODEARRIVES", n, "pass offline Machines"); + // if it was dead, then it isn't any more, AND it's mine, so I need to restart it if ( unresponsiveMachines.containsKey(node) ) { // reactive the node - Machine m = unresponsiveMachines.remove(node); - if ( m.getShareOrder() != order ) { // can change. e.g. if it was taken offline for - m.setShareOrder(order); // hardware changes. - } + logger.info(methodName, null, "RECOVER NODE", n); + Machine m = unresponsiveMachines.remove(node); // not unresponsive any more + + // Deal with memory on the machine changing + adjustMachinesByOrder(order, m); + + // Note: The machine must be on all the other lists by definition since it wasn't taken off when it went offline - allMachines.put(node, m); - machinesByName.put(m.getId(), m); - machinesByIp.put(m.getIp(), m); - HashMap<Node, Machine> mlist = machinesByOrder.get(order); - incrementOnlineByOrder(order); - if ( mlist == null ) { - mlist = new HashMap<Node, Machine>(); - machinesByOrder.put(order, mlist); - } - mlist.put(m.key(), m); - - total_shares += order; // UIMA-3939 signalDb(m, RmNodes.Responsive, true); - logger.info(methodName, null, "Nodepool:", id, "Host reactivated ", m.getId(), String.format("shares %2d total %4d:", order, total_shares), m.toString()); + logger.info(methodName, null, "Nodepool:", id, "Host reactivated ", m.getId(), "shares", order, m.toString()); return m; } + // logger.info(methodName, null, "NODEARRIVES", n, "pass unresponsive Machines"); + // ok, it is my problem? If so, then it isn't offline or dead, so it's ok, and we're done + if ( allMachines.containsKey(node) ) { // already known, do nothing + Machine m = allMachines.get(node); + + // Deal with memory on the machine changing + adjustMachinesByOrder(order, m); + + logger.trace(methodName, null, "Node ", m.getId(), " is already known, not adding."); + return m; + } + // logger.info(methodName, null, "NODEARRIVES", n, "pass allMachines"); + + // If we fall through it's a new one. Machine machine = new Machine(node); // brand new machine, make it active Node key = machine.key(); machine.setShareOrder(order); allMachines.put(key, machine); // global list machinesByName.put(machine.getId(), machine); machinesByIp.put(machine.getIp(), machine); - incrementOnlineByOrder(order); + //incrementOnlineByOrder(order); machine.setNodepool(this); total_shares += order; @@ -1125,71 +1134,48 @@ class NodePool props.put(RmNodes.Responsive, true); props.put(RmNodes.Online, true); try { - persistence.createMachine(machine.getId(), props); - } catch (Exception e) { - logger.warn(methodName, null, "Cannot write machine to DB:", machine.getId(), e); - } + persistence.createMachine(machine.getId(), props); + } catch (Exception e) { + logger.warn(methodName, null, "Cannot write machine to DB:", machine.getId(), e); + } return machine; } - void disable(Machine m, HashMap<Node, Machine> disableMap) + void disable(Machine m) { String methodName = "disable"; - if ( allMachines.containsKey(m.key()) ) { - logger.info(methodName, null, "Nodepool:", id, "Host disabled:", m.getId(), "Looking for shares to clear"); - - int order = m.getShareOrder(); - String name = m.getId(); - String ip = m .getIp(); - - HashMap<Share, Share> shares = m.getActiveShares(); - for (Share s : shares.values()) { - IRmJob j = s.getJob(); - - if ( j.getDuccType() == DuccType.Reservation ) { - // UIMA-3614. Only actual reservation is left intact - logger.info(methodName, null, "Nodepool:", id, "Host dead/offline:", m.getId(), "Not purging", j.getDuccType()); - break; - } - - switch ( j.getDuccType() ) { - case Reservation: - // UIMA-3614. Only actual reservation is left intact - logger.info(methodName, null, "Nodepool:", id, "Host dead/offline:", m.getId(), "Not purging", j.getDuccType()); - break; + logger.info(methodName, null, "Nodepool:", id, "Host disabled:", m.getId(), "Looking for shares to clear"); - case Service: - case Pop: - j.markComplete(); // UIMA-4327 Must avoid reallocation, these guys are toast if they get purged. - logger.info(methodName, null, "Nodepool:", id, "Host dead/offline:", m.getId(), "Mark service/pop completed."); - // NO BREAK, must fall through - case Job: - default: - break; - } + int order = m.getShareOrder(); + String name = m.getId(); + String ip = m .getIp(); + + HashMap<Share, Share> shares = m.getActiveShares(); + for (Share s : shares.values()) { + IRmJob j = s.getJob(); + if ( j.getSchedulingPolicy() != Policy.FAIR_SHARE ) { + logger.info(methodName, j.getId(), "Nodepool:", id, "Host dead/offline:", m.getId(), "Not purging NP work", j.getDuccType()); + } else { logger.info(methodName, j.getId(), "Nodepool:", id, "Purge", j.getDuccType(), "on dead/offline:", m.getId()); j.shrinkByOne(s); nPendingByOrder[order]++; - - s.purge(); // This bet tells OR not to wait for confirmation from the agent + + s.purge(); // This bit tells OR not to wait for confirmation from the agent } + } + } - allMachines.remove(m.key()); - decrementOnlineByOrder(order); - total_shares -= order; - disableMap.put(m.key(), m); - - HashMap<Node, Machine> machs = machinesByOrder.get(order); - machs.remove(m.key()); - if ( machs.size() == 0 ) { - machinesByOrder.remove(order); - } - machinesByName.remove(name); - machinesByIp.remove(ip); - logger.info(methodName, null, "Nodepool:", id, "Node leaves:", m.getId(), "total shares:", total_shares); + void nodeLeaves(Machine m) + { + // note, simpler than varyoff because we really don't care about unusual + // conditions since there's nobody to tell + if ( allMachines.containsKey(m.key()) ) { + disable(m); + unresponsiveMachines.put(m.key(), m); + signalDb(m, RmNodes.Responsive, false); } else { for ( NodePool np : children.values() ) { np.nodeLeaves(m); @@ -1197,61 +1183,78 @@ class NodePool } } - void nodeLeaves(Machine m) - { - disable(m, unresponsiveMachines); - signalDb(m, RmNodes.Responsive, false); - } - // UIMA-4142 // helper for CLI things that refer to things by name only. do we know about anything by this // name? see resolve() in Scheduler.java. boolean hasNode(String n) { - if ( machinesByName.containsKey(n) ) return true; + return machinesByName.containsKey(n); + } - // If not we have to search the offline machines and the unresponsive machines which are - // keyed differently. This is really ugly but hard to fix at this point, so cope. - for ( Node node : offlineMachines.keySet() ) { - if ( node.getNodeIdentity().getName().equals(n) ) return true; - } - for ( Node node : unresponsiveMachines.keySet() ) { - if ( node.getNodeIdentity().getName().equals(n) ) return true; + NodePool findNodepoolByNodename(String n) + { + if ( hasNode(n) ) { + return this; + } else { + for ( NodePool np : children.values() ) { + NodePool ret = np.findNodepoolByNodename(n); + if ( ret != null ) { + return ret; + } + } } - return false; + return null; } - String varyoff(String node) + private String doVaryOff(String node) { + // caller must insure node is known to "me" Machine m = machinesByName.get(node); - if ( m == null ) { - // ok, maybe it's already offline or maybe dead - // relatively rare, cleaner to search than to make yet another index + if (offlineMachines.containsKey(m.key()) ) { + return "VaryOff: Nodepool " + id + " - Already offline: " + node; + } - for ( Machine mm : offlineMachines.values() ) { - if ( mm.getId().equals(node) ) { - return "VaryOff: Nodepool " + id + " - Already offline: " + node; - } - } + if ( unresponsiveMachines.containsKey(m.key()) ) { + // lets be friendly and tell caller it's also unresponsive + offlineMachines.put(m.key(), m); + signalDb(m, RmNodes.Online, false); + return "VaryOff: Nodepool " + id + " - Unresponsive machine, marked offline: " + node; + } - Iterator<Machine> iter = unresponsiveMachines.values().iterator(); - while ( iter.hasNext() ) { - Machine mm = iter.next(); - if ( mm.getId().equals(node) ) { - Node key = mm.key(); - iter.remove(); - offlineMachines.put(key, mm); - signalDb(m, RmNodes.Online, false); - return "VaryOff: Nodepool " + id + " - Unresponsive machine, marked offline: " + node; - } - } + offlineMachines.put(m.key(), m); + disable(m); + signalDb(m, RmNodes.Online, false); + return "VaryOff: " + node + " - OK."; + } + String varyoff(String node) + { + // note, vaguely trickier than 'nodeLeaves' because we need to catch the + // potential user confusions and reflect them back. + NodePool np = findNodepoolByNodename(node); + if ( np == null ) { return "VaryOff: Nodepool " + id + " - Cannot find machine: " + node; } - disable(m, offlineMachines); - signalDb(m, RmNodes.Online, false); - return "VaryOff: " + node + " - OK."; + // note we only call this if we know for sure the node can be found and associated with a NP + return np.doVaryOff(node); // must direct to the correct context + } + + private String doVaryOn(String node) + { + + // caller must insure node is known to "me" + Machine m = machinesByName.get(node); + Node key = m.key(); + + if ( ! offlineMachines.containsKey(key) ) { + return "VaryOn: Nodepool " + id + " - Already online: " + m.getId(); + } + + offlineMachines.remove(key); + signalDb(m, RmNodes.Online, true); + + return "VaryOn: Nodepool " + id + " - Machine marked online: " + node; } /** @@ -1260,30 +1263,21 @@ class NodePool */ String varyon(String node) { - if ( machinesByName.containsKey(node) ) { - return "VaryOn: Nodepool " + id + " - Already online: " + node; + NodePool np = findNodepoolByNodename(node); + if ( np == null ) { + return "VaryOff: Nodepool " + id + " - Cannot find machine: " + node; } - Iterator<Machine> iter = offlineMachines.values().iterator(); - while ( iter.hasNext() ) { - Machine mm = iter.next(); - if ( mm.getId().equals(node) ) { - iter.remove(); - signalDb(mm, RmNodes.Online, true); - return "VaryOn: Nodepool " + id + " - Machine marked online: " + node; - } - } + return np.doVaryOn(node); // must pass to the right nodepool, can't do it "here" + } - iter = unresponsiveMachines.values().iterator(); - while ( iter.hasNext() ) { - Machine mm = iter.next(); - if ( mm.getId().equals(node) ) { - signalDb(mm, RmNodes.Online, true); - return "VaryOn: Nodepool " + id + " - Machine is online but not responsive: " + node; - } - } - - return "VaryOn: Nodepool " + id + " - Cannot find machine: " + node; + boolean isSchedulable(Machine m) + { + if ( m.isBlacklisted() ) return false; + if ( unresponsiveMachines.containsKey(m.key()) ) return false; + if ( offlineMachines.containsKey(m.key()) ) return false; + + return true; } /** @@ -1773,7 +1767,7 @@ class NodePool ml.addAll(machs.values()); for ( Machine m : ml ) { // look for space - if ( m.isBlacklisted() ) continue; // nope + if ( !isSchedulable(m) ) continue; // nope if ( (!allowVertical) && (m.hasVerticalConflict(j)) ) continue; // UIMA-4712 int g = Math.min(needed, m.countFreeShares(order)); // adjust by the order supported on the machine for ( int ndx= 0; ndx < g; ndx++ ) { Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1725763&r1=1725762&r2=1725763&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Wed Jan 20 17:42:22 2016 @@ -902,9 +902,11 @@ public class Scheduler // tracking the OR hang problem - are topics being delivered? logger.info("nodeArrives", null, "Total arrivals:", total_arrivals); - handleIllNodes(); - handleDeadNodes(); - resetNodepools(); + synchronized(this) { + handleIllNodes(); + handleDeadNodes(); + resetNodepools(); + } // TODO: Can we combine these two into one? SchedulingUpdate upd = new SchedulingUpdate(); // state from internal scheduler @@ -1166,23 +1168,20 @@ public class Scheduler // The first block insures the node is in the scheduler's records as soon as possible total_arrivals++; // report these in the main schedule loop - // the amount of memory available for shares, adjusted with configured overhead - NodePool np = getNodepoolByName(node.getNodeIdentity()); + NodePool np = getNodepoolByName(node.getNodeIdentity()); // finds np assigned in ducc.nodes; if none, returns the default np Machine m = np.getMachine(node); int share_order = 0; - - if ( m == null ) { - // allNodes.put(node, node); - long allocatable_mem = node.getNodeMetrics().getNodeMemory().getMemFree() - share_free_dram; - if ( dramOverride > 0 ) { - allocatable_mem = dramOverride; - } - share_order = (int) (allocatable_mem / np.getShareQuantum()); // conservative - rounds down (this will always cast ok) - } else { - share_order = m.getShareOrder(); - } - + + // let's always recalculate this in case it changes for whatever bizarre reason (reboot, or pinned process gone, or whatever) + long allocatable_mem = node.getNodeMetrics().getNodeMemory().getMemFree() - share_free_dram; + if ( dramOverride > 0 ) { + allocatable_mem = dramOverride; + } + share_order = (int) (allocatable_mem / np.getShareQuantum()); // conservative - rounds down (this will always cast ok) + // NOTE: we cannot set the order into the machine yet, in case it has changed, because NodePool needs to adjust based + // on current and new + max_order = Math.max(share_order, max_order); m = np.nodeArrives(node, share_order); // announce to the nodepools m.heartbeatArrives(); @@ -1307,7 +1306,7 @@ public class Scheduler freeMachines[i] += np.countFreeMachines(i); // (these are local, as we want) } - np.getLocalOnlineByOrder(onlineMachines); + //np.getLocalOnlineByOrder(onlineMachines); ret.setOnlineMachines(onlineMachines); ret.setFreeMachines(freeMachines); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java?rev=1725763&r1=1725762&r2=1725763&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java Wed Jan 20 17:42:22 2016 @@ -294,7 +294,7 @@ public class Share } catch (Exception e) { logger.warn(methodName, job.getId(), "Cannot update share statistics in database for share", id, e); } - logger.info(methodName, jobid, "UPDATE:", investment, state, getInitializationTime(), pid); + return true; }