Author: challngr Date: Fri Nov 20 21:48:11 2015 New Revision: 1715427 URL: http://svn.apache.org/viewvc?rev=1715427&view=rev Log: UIMA-4577 Support rmshares table.
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbShare.java Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java Added: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbShare.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbShare.java?rev=1715427&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbShare.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbShare.java Fri Nov 20 21:48:11 2015 @@ -0,0 +1,45 @@ +package org.apache.uima.ducc.common.persistence.rm; + +import org.apache.uima.ducc.common.Node; +import org.apache.uima.ducc.common.NodeIdentity; +import org.apache.uima.ducc.common.utils.id.DuccId; + +public interface IDbShare { + + + public abstract int getNodepoolDepth(); + + public abstract String getNodepoolId(); + + public abstract DuccId getId(); + + // UIMA-4142 + public abstract boolean isBlacklisted(); + + // UIMA-4142 + public abstract DuccId getBlJobId(); + + public abstract NodeIdentity getNodeIdentity(); + + public abstract Node getNode(); + + /** + * The order of the share itself. + */ + public abstract int getShareOrder(); + + /** + * Returns only initialization time. Eventually getInvestment() may take other things into + * consideration so we separate these two (even though currently they do the same thing.) + */ + public abstract long getInitializationTime(); + + public abstract void setInitializationTime(long millis); + + public abstract void setFixed(); + + public abstract boolean isPurged(); + public abstract boolean isEvicted(); + public abstract boolean isFixed(); + +} Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java?rev=1715427&r1=1715426&r2=1715427&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java Fri Nov 20 21:48:11 2015 @@ -91,8 +91,9 @@ public interface IRmPersistence * @param id The node name * @param jobid The duccid of the job owning the new shoare * @param shareid The DuccId of the new share. + * @param quantum The scheduling quantum in GB used for this share. */ - public void addAssignment(String id, DuccId jobid, DuccId shareid); + public void addAssignment(String id, DuccId jobid, IDbShare share, int quantum, String jobtype) throws Exception; /** * Remove a share from the machine. @@ -100,7 +101,54 @@ public interface IRmPersistence * @param jobid The duccid of the job owning the new shoare * @param shareid The DuccId of the new share. */ - public void removeAssignment(String id, DuccId jobid, DuccId shareid); + public void removeAssignment(String id, DuccId jobid, IDbShare share) throws Exception; + + /** + * Update a share definition to show it is non-preemptable. + * @param node The node where the share resides. + * @param shareId The (RM-assigned) DuccId of the share. + * @param jobId The OR-assigned DuccId of the job. + * @param val True if it is non-preemptable, false otherwise. + * + * NOTE: The triple (node, shareid, jobid) form the primary key to find the share in the DB + */ + public void setFixed(String node, DuccId shareId, DuccId jobId, boolean val) throws Exception; + /** + * Update a share definition to show it is non-preemptable. + * @param node The node where the share resides. + * @param shareId The (RM-assigned) DuccId of the share. + * @param jobId The OR-assigned DuccId of the job. + * @param val True if it is non-preemptable. RM will never set this false. + * + * NOTE: The triple (node, shareid, jobid) form the primary key to find the share in the DB + */ + + public void setPurged(String node, DuccId shareId, DuccId jobId, boolean val) throws Exception; + + /** + * Update a share definition to show it is evicted aka preempted by RM. + * @param node The node where the share resides. + * @param shareId The (RM-assigned) DuccId of the share. + * @param jobId The OR-assigned DuccId of the job. + * @param val True if it is evicted. Once evicted it can not be un-evicted so RM never sets this false. + * + * NOTE: The triple (node, shareid, jobid) form the primary key to find the share in the DB + */ + public void setEvicted(String node, DuccId shareId, DuccId jobId, boolean val) throws Exception; + + /** + * Update current information about the share from the current OR publication. + * @param node The node where the share resides. + * @param shareId The (RM-assigned) DuccId of the share. + * @param jobId The OR-assigned DuccId of the job. + * @param investment The "investment", i.e. amount of CPU expended by the process in the share so far. + * @param state The OR-assigned state of the process in the share. + * @param init_time The time the process spent in its initialization phase. + * @param pid The *ix process id of the process in the share. + * + * NOTE: The triple (node, shareid, jobid) form the primary key to find the share in the DB + */ + public void updateShare(String node, DuccId shareid, DuccId jobid, long investment, String state, long init_time, long pid) throws Exception; /** * Fetch a machine by its id. @@ -210,4 +258,95 @@ public interface IRmPersistence public String columnName() { return pname(); } public boolean isIndex() { return false; } } + + enum RmShares + implements IDbProperty + { + TABLE_NAME { + public String pname() { return "rmshares"; } + public Type type() { return Type.String; } + public boolean isPrivate() { return true;} + public boolean isMeta() { return true;} + }, + // share uniqueness is given by the rm duccid + jobid. We add in the node to act as cassandra's cluster id + // because the principal query at the moment is to find shares on a node. + + // the way cassandra works: 'node' will become the cluster key, and ducc_dbid, job_id the row key + Node { + public String pname() { return "node"; } + public Type type() { return Type.String; } + public boolean isPrimaryKey() { return true; } + }, + DuccDbid { + public String pname() { return "ducc_dbid"; } + public Type type() { return Type.Long; } + public boolean isPrimaryKey() { return true; } + }, + + JobId { + public String pname() { return "job_id"; } + public Type type() { return Type.Long; } + public boolean isPrimaryKey() { return true; } + }, + + Uuid { + public String pname() { return "uuid"; } + public Type type() { return Type.UUID; } + }, + ShareOrder { + public String pname() { return "share_order"; } + public Type type() { return Type.Integer; } + }, + Quantum { + public String pname() { return "quantum"; } + public Type type() { return Type.Integer; } + }, + InitTime { + public String pname() { return "init_time"; } + public Type type() { return Type.Long; } + }, + Evicted { + public String pname() { return "evicted"; } + public Type type() { return Type.Boolean; } + }, + Purged { + public String pname() { return "purged"; } + public Type type() { return Type.Boolean; } + }, + Fixed { + public String pname() { return "fixed"; } + public Type type() { return Type.Boolean; } + }, + Blacklisted { + public String pname() { return "blacklisted"; } + public Type type() { return Type.Boolean; } + }, + State { + public String pname() { return "state"; } + public Type type() { return Type.String; } + }, + Pid { + public String pname() { return "pid"; } + public Type type() { return Type.Long; } + }, + JobType { + public String pname() { return "jobtype"; } + public Type type() { return Type.String; } + }, + Investment { + public String pname() { return "investment"; } + public Type type() { return Type.Long; } + }, + ; + + public boolean isPrimaryKey() { return false; } + public boolean isPrivate() { return false; } + public boolean isMeta() { return false; } + public String columnName() { return pname(); } + public boolean isIndex() { return false; } + + } + } + + Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java?rev=1715427&r1=1715426&r2=1715427&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java Fri Nov 20 21:48:11 2015 @@ -38,8 +38,12 @@ public class NullRmStatePersistence impl public void setNodeProperty(String id, RmNodes key, Object value) { } public void setNodeProperties(String id, Object... props) {} public void createMachine(String id, Map<RmNodes, Object> props) { } - public void addAssignment(String id, DuccId jobid, DuccId shareid) {} - public void removeAssignment(String id, DuccId jobid, DuccId shareid) {} + public void addAssignment(String id, DuccId jobid, IDbShare shareid, int quantum, String type) {} + public void removeAssignment(String id, DuccId jobid, IDbShare shareid) {} + public void setEvicted(String node, DuccId shareId, DuccId jobId, boolean val) {} + public void setFixed(String node, DuccId shareId, DuccId jobId, boolean val) {} + public void setPurged(String node, DuccId shareId, DuccId jobId, boolean val) {} + public void updateShare(String node, DuccId shareid, DuccId jobid, long investment, String state, long init_time, long pid) {} public Properties getMachine(String id) { return null; } public Map<String, Map<String, Object>> getAllMachines() { return new HashMap<String, Map<String, Object>>(); } } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java?rev=1715427&r1=1715426&r2=1715427&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java Fri Nov 20 21:48:11 2015 @@ -133,10 +133,10 @@ public class DbCreate return false; } Metadata metadata = cluster.getMetadata(); - doLog(methodName, "Connected to cluster: %s\n", metadata.getClusterName()); + logger.info(methodName, null, "Connected to cluster:", metadata.getClusterName()); for ( Host host : metadata.getAllHosts() ) { - doLog(methodName, "Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); + logger.info(methodName, null, "Datatacenter:", host.getDatacenter(), "Host:", host.getAddress(), "Rack:", host.getRack()); } session = cluster.connect(); return true; Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java?rev=1715427&r1=1715426&r2=1715427&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java Fri Nov 20 21:48:11 2015 @@ -65,6 +65,21 @@ public class DbHandle return manager.execute(s); } + ResultSet execute(PreparedStatement ps, Object ... fields) + throws Exception + { + String methodName = "execute"; + long now = System.currentTimeMillis(); + + try { + BoundStatement boundStatement = new BoundStatement(ps); + BoundStatement bound = boundStatement.bind(fields); + return execute(bound); + } finally { + logger.info(methodName, null, "Time to execute prepared statement:", ps.getQueryString(), System.currentTimeMillis() - now); + } + } + /** * Delete the object of the indicated type and duccid. We optionally commit in case we want to * do more things that have to work under the same transaction so we can rollback if needed.= Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java?rev=1715427&r1=1715426&r2=1715427&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java Fri Nov 20 21:48:11 2015 @@ -100,10 +100,10 @@ public class DbManager .build(); Metadata metadata = cluster.getMetadata(); - logger.info(methodName, null, "Connected to cluster: %s\n", metadata.getClusterName()); + logger.info(methodName, null, "Connected to cluster:", metadata.getClusterName()); for ( Host host : metadata.getAllHosts() ) { - logger.info(methodName, null, "Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack()); + logger.info(methodName, null, "Datatacenter:", host.getDatacenter(), "Host:", host.getAddress(), "Rack:", host.getRack()); } } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java?rev=1715427&r1=1715426&r2=1715427&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java Fri Nov 20 21:48:11 2015 @@ -25,10 +25,12 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.uima.ducc.common.persistence.rm.IDbShare; import org.apache.uima.ducc.common.persistence.rm.IRmPersistence; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.id.DuccId; +import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.SimpleStatement; @@ -44,7 +46,15 @@ public class RmStatePersistence DbManager dbManager = null; DuccLogger logger = null; - static final String RM_NODE_TABLE = RmNodes.TABLE_NAME.pname(); + static final String RM_NODE_TABLE = RmNodes.TABLE_NAME.pname(); + static final String RM_SHARE_TABLE = RmShares.TABLE_NAME.pname(); + + PreparedStatement shareAddPrepare = null; + PreparedStatement shareDelPrepare = null; + PreparedStatement updateFixedPrepare = null; + PreparedStatement updatePurgedPrepare = null; + PreparedStatement updateEvictedPrepare = null; + PreparedStatement updateSharePrepare = null; public RmStatePersistence() { @@ -79,11 +89,21 @@ public class RmStatePersistence this.logger = logger; String stateUrl = System.getProperty("ducc.state.database.url"); init(stateUrl); + DbHandle h = dbManager.open(); + + // For creating a new share + // These are upserts - sometimes the shares are updated before they're actually added to the DB. + shareAddPrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET uuid=?, share_order=?, blacklisted=?, evicted=?, fixed=?, purged=?, quantum=?, jobtype=? WHERE node=? AND ducc_dbid=? and job_id=?"); + shareDelPrepare = h.prepare("DELETE FROM " + RM_SHARE_TABLE + " WHERE node = ? and ducc_dbid = ? and job_id = ?;"); + updateFixedPrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET fixed = ? WHERE node = ? AND ducc_dbid = ? and job_id = ?"); + updatePurgedPrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET purged = ? WHERE node = ? AND ducc_dbid = ? and job_id = ?"); + updateEvictedPrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET evicted = ? WHERE node = ? AND ducc_dbid = ? and job_id = ?"); + updateSharePrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET investment = ?, state = ?, init_time = ?, pid = ? WHERE node = ? AND ducc_dbid = ? and job_id = ?"); } public void close() { - dbManager.shutdown(); + if ( dbManager != null ) dbManager.shutdown(); dbManager = null; } @@ -95,6 +115,7 @@ public class RmStatePersistence try { h = dbManager.open(); h.execute("TRUNCATE " + RM_NODE_TABLE); + h.execute("TRUNCATE " + RM_SHARE_TABLE); } catch ( Exception e ) { logger.error(methodName, null, "Cannot clear the database.", e); } @@ -113,6 +134,16 @@ public class RmStatePersistence for ( String s : indexes ) { ret.add(new SimpleStatement(s)); } + + buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + RM_SHARE_TABLE + " ("); + buf.append(DbUtil.mkSchema(RmShares.values())); + buf.append(")"); + ret.add(new SimpleStatement(buf.toString())); + indexes = DbUtil.mkIndices(RmShares.values(), RM_SHARE_TABLE); + for ( String s : indexes ) { + ret.add(new SimpleStatement(s)); + } + return ret; } @@ -167,12 +198,46 @@ public class RmStatePersistence } - public void addAssignment(String node, DuccId jobid, DuccId shareid) + public void addAssignment(String node, DuccId jobid, IDbShare s, int quantum, String type) + throws Exception { + DbHandle h = dbManager.open(); + h.saveObject(shareAddPrepare, s.getId().getUUID(), s.getShareOrder(), s.isBlacklisted(), s.isEvicted(), s.isFixed(), s.isPurged(), quantum, type, node, s.getId().getFriendly(), jobid.getFriendly() ); } - public void removeAssignment(String node, DuccId jobid, DuccId shareid) + public void removeAssignment(String node, DuccId jobid, IDbShare s) + throws Exception + { + DbHandle h = dbManager.open(); + h.execute(shareDelPrepare, node, s.getId().getFriendly(), jobid.getFriendly()); + } + + public void setFixed(String node, DuccId shareId, DuccId jobId, boolean val) + throws Exception + { + DbHandle h = dbManager.open(); + h.execute(updateFixedPrepare, val, node, shareId.getFriendly(), jobId.getFriendly()); + } + + public void setPurged(String node, DuccId shareId, DuccId jobId, boolean val) + throws Exception + { + DbHandle h = dbManager.open(); + h.execute(updatePurgedPrepare, val, node, shareId.getFriendly(), jobId.getFriendly()); + } + + public void setEvicted(String node, DuccId shareId, DuccId jobId, boolean val) + throws Exception + { + DbHandle h = dbManager.open(); + h.execute(updateEvictedPrepare, val, node, shareId.getFriendly(), jobId.getFriendly()); + } + + public void updateShare(String node, DuccId shareid, DuccId jobid, long investment, String state, long init_time, long pid) + throws Exception { + DbHandle h = dbManager.open(); + h.execute(updateSharePrepare, investment, state, init_time, pid, node, shareid.getFriendly(), jobid.getFriendly()); } public Properties getMachine(String m) Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java?rev=1715427&r1=1715426&r2=1715427&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java Fri Nov 20 21:48:11 2015 @@ -719,6 +719,7 @@ public class JobManagerConverter // we can crash or at least complain loudly on mismatch. Share s = scheduler.getShare(p.getDuccId()); + long mem = p.getResidentMemory(); long investment = p.getWiMillisInvestment(); ProcessState state = p.getProcessState(); @@ -957,9 +958,20 @@ public class JobManagerConverter break; case Service: case Pop: + // This is really an AP and OR sets the state to running immediately although it isn't yet, so the + // information is incomplete. We always have to reconcile. + if ( ((IDuccWorkService)l).getServiceDeploymentType() == ServiceDeploymentType.other ) { + logger.info(methodName, l.getDuccId(), "[P] State: ", r.getStateObject(), "->", l.getStateObject()); + reconcileProcesses(l.getDuccId(), l, r); + } else if ( r.getStateObject() != l.getStateObject() ) { + // Service state does come int correctly + logger.info(methodName, l.getDuccId(), "[S] State: ", r.getStateObject(), "->", l.getStateObject()); + reconcileProcesses(l.getDuccId(), l, r); + } + break; case Reservation: if ( r.getStateObject() != l.getStateObject() ) { - logger.info(methodName, l.getDuccId(), "[SPR] State: ", r.getStateObject(), "->", l.getStateObject()); + logger.info(methodName, l.getDuccId(), "[R] State: ", r.getStateObject(), "->", l.getStateObject()); } // for the moment, these guys have nothing to reconcile. break; Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java?rev=1715427&r1=1715426&r2=1715427&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java Fri Nov 20 21:48:11 2015 @@ -127,6 +127,7 @@ public class ResourceManagerComponent } } + class RmAdminEventProcessor implements Processor { final AbstractDuccComponent delegate; @@ -258,6 +259,15 @@ public class ResourceManagerComponent return null; } + public void stop() + throws Exception + { + String methodName = "stop"; + logger.info(methodName, null, "Stopping RM database connection"); + scheduler.stop(); + super.stop(); + } + public void setTransportConfiguration(DuccEventDispatcher eventDispatcher, String endpoint) { this.eventDispatcher = eventDispatcher; Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java?rev=1715427&r1=1715426&r2=1715427&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java Fri Nov 20 21:48:11 2015 @@ -77,6 +77,7 @@ public interface ISchedulerMain // once both initialized() and ready() occur, the RM scaffolding will enable scheduling by calling start void start(); + void stop(); RmAdminReply varyoff(String[] nodes); RmAdminReply varyon(String[] nodes); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java?rev=1715427&r1=1715426&r2=1715427&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java Fri Nov 20 21:48:11 2015 @@ -320,10 +320,10 @@ public class Machine try { // Not transactional. If this turns into a problem we'll have to find a way persistence.setNodeProperties(id, RmNodes.Assignments, activeShares.size(), RmNodes.SharesLeft, shares_left); - persistence.addAssignment(id, s.getJob().getId(), s.getId()); // update jobs on machine and specific shares + persistence.addAssignment(id, s.getJob().getId(), s, getQuantum(), s.getJob().getShortType()); // update jobs on machine and specific shares logger.info(methodName, null, "Time to assign share in db", System.currentTimeMillis() - now); } catch (Exception e) { - logger.warn(methodName, null, "Cannot save state; shares_left", shares_left); + logger.warn(methodName, null, "Cannot save state; shares_left", shares_left, e); } } @@ -339,7 +339,7 @@ public class Machine try { // Not transactional. If this turns into a problem we'll have to find a way persistence.setNodeProperties(id, RmNodes.Assignments, activeShares.size(), RmNodes.SharesLeft, shares_left); - persistence.removeAssignment(id, s.getJob().getId(), s.getId()); // update jobs on machine and specific shares + persistence.removeAssignment(id, s.getJob().getId(), s); // update jobs on machine and specific shares logger.info(methodName, null, "Time to remove share in db", System.currentTimeMillis() - now); } catch (Exception e) { logger.warn(methodName, null, "Cannot save state; shares_left", shares_left); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1715427&r1=1715426&r2=1715427&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java Fri Nov 20 21:48:11 2015 @@ -132,6 +132,7 @@ public class Scheduler boolean stability = false; private static DuccIdFactory idFactory; + IRmPersistence persistence = null; // static boolean expandByDoubling = true; // static int initializationCap = 2; // Max allocation until we know initialization works in @@ -264,7 +265,7 @@ public class Scheduler logger.info(methodName, null, " RM Version : ", ""+ rmversion_major + "." + rmversion_minor + "." + rmversion_ptf); - IRmPersistence persistence = RmPersistenceFactory.getInstance(this.getClass().getName(), "RM"); + persistence = RmPersistenceFactory.getInstance(this.getClass().getName(), "RM"); persistence.clear(); initialized = true; } @@ -801,6 +802,11 @@ public class Scheduler stability = true; } + public void stop() + { + persistence.close(); + } + protected void handleIllNodes() { String methodName = "handleIllNodes"; Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java?rev=1715427&r1=1715426&r2=1715427&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java Fri Nov 20 21:48:11 2015 @@ -23,6 +23,10 @@ import java.util.Map; import org.apache.uima.ducc.common.Node; import org.apache.uima.ducc.common.NodeIdentity; +import org.apache.uima.ducc.common.persistence.rm.IDbShare; +import org.apache.uima.ducc.common.persistence.rm.IRmPersistence; +import org.apache.uima.ducc.common.persistence.rm.RmPersistenceFactory; +import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.id.DuccId; import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState; import org.apache.uima.ducc.transport.event.common.ITimeWindow; @@ -36,9 +40,10 @@ import org.apache.uima.ducc.transport.ev * A share is ALWAYS associated with a Machine. */ public class Share - implements SchedConstants + implements SchedConstants, + IDbShare { - //private transient DuccLogger logger = DuccLogger.getLogger(Share.class, COMPONENT_NAME); + private transient DuccLogger logger = DuccLogger.getLogger(Share.class, COMPONENT_NAME); private transient Machine machine; // machine associatede with this share, assigned after "what-of" private DuccId id = null; // unique *within this machine* assigned after "what-of" @@ -57,6 +62,9 @@ public class Share private long investment = 0; // Current time for all ACTIVE work items in the process + // note this returns a global static instance, no need to staticisze it here + private IRmPersistence persistence = null; + @SuppressWarnings("unused") private long resident_memory = 0; @SuppressWarnings("unused") @@ -88,6 +96,7 @@ public class Share this.job = job; this.bljobid = null; // UIMA-4142 this.share_order = share_order; + this.persistence = RmPersistenceFactory.getInstance(this.getClass().getName(), "RM"); } /** @@ -102,6 +111,7 @@ public class Share this.job = null; this.bljobid = jobid; // UIMA-4142 this.share_order = share_order; + this.persistence = RmPersistenceFactory.getInstance(this.getClass().getName(), "RM"); } /** @@ -114,6 +124,7 @@ public class Share this.job = job; this.bljobid = null; // UIMA-4142 this.share_order = share_order; + this.persistence = RmPersistenceFactory.getInstance(this.getClass().getName(), "RM"); } // /** @@ -265,6 +276,7 @@ public class Share public boolean update(DuccId jobid, long mem, long investment, ProcessState state, ITimeWindow init_time, String pid) { + String methodName = "update"; if ( ! jobid.equals(job.getId()) ) return false; // something has gone horribly wrong this.resident_memory = mem; @@ -272,6 +284,17 @@ public class Share this.state = state; this.pid = pid; this.init_time = init_time; + try { + long npid = -1L; + if ( pid != null ) { // OR sends junk here for a while + npid = Long.parseLong(pid); + } + + persistence.updateShare(getNode().getNodeIdentity().getName(), id, jobid, investment, state.toString(), getInitializationTime(), npid); + } catch (Exception e) { + logger.warn(methodName, job.getId(), "Cannot update share statistics in database for share", id, e); + } + logger.info(methodName, jobid, "UPDATE:", investment, state, getInitializationTime(), pid); return true; } @@ -321,6 +344,9 @@ public class Share public void setInitializationTime(long millis) { + String methodName = "setInitializationTme"; + logger.info(methodName, null, "SET INIT TIME", "shareid", id, millis); + init_time = new TimeWindow(); init_time.setStartLong(0); init_time.setEndLong(millis); @@ -335,27 +361,45 @@ public class Share public void setFixed() { + String methodName = "setFixed"; fixed = true; + try { + persistence.setFixed(getNode().getNodeIdentity().getName(), id, job.getId(), true); + } catch (Exception e) { + logger.warn(methodName, job.getId(), "Cannot update 'fixed' in database for share", id, e); + } } - boolean isFixed() + public boolean isFixed() { return fixed; } void evict() { + String methodName = "evicted"; evicted = true; + try { + persistence.setEvicted(getNode().getNodeIdentity().getName(), id, job.getId(), true); + } catch (Exception e) { + logger.warn(methodName, job.getId(), "Cannot update 'evicted' in database for share", id, e); + } } - boolean isEvicted() + public boolean isEvicted() { return evicted || purged; } void purge() { + String methodName = "purge"; purged = true; + try { + persistence.setPurged(getNode().getNodeIdentity().getName(), id, job.getId(), true); + } catch (Exception e) { + logger.warn(methodName, job.getId(), "Cannot update 'purge bit' in database for share", id, e); + } } public boolean isPurged()