Author: challngr
Date: Fri Nov 20 21:48:11 2015
New Revision: 1715427

URL: http://svn.apache.org/viewvc?rev=1715427&view=rev
Log:
UIMA-4577 Support rmshares table.

Added:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbShare.java
Modified:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java

Added: 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbShare.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbShare.java?rev=1715427&view=auto
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbShare.java
 (added)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbShare.java
 Fri Nov 20 21:48:11 2015
@@ -0,0 +1,45 @@
+package org.apache.uima.ducc.common.persistence.rm;
+
+import org.apache.uima.ducc.common.Node;
+import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.utils.id.DuccId;
+
+public interface IDbShare {
+
+
+       public abstract int getNodepoolDepth();
+
+       public abstract String getNodepoolId();
+
+       public abstract DuccId getId();
+
+       // UIMA-4142
+       public abstract boolean isBlacklisted();
+
+       // UIMA-4142
+       public abstract DuccId getBlJobId();
+
+       public abstract NodeIdentity getNodeIdentity();
+
+       public abstract Node getNode();
+
+       /**
+        * The order of the share itself.
+        */
+       public abstract int getShareOrder();
+
+       /**
+        * Returns only initialization time.  Eventually getInvestment() may 
take other things into
+        * consideration so we separate these two (even though currently they 
do the same thing.)
+        */
+       public abstract long getInitializationTime();
+
+       public abstract void setInitializationTime(long millis);
+
+       public abstract void setFixed();
+
+       public abstract boolean isPurged();
+    public abstract boolean isEvicted();
+    public abstract boolean isFixed();
+
+}

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java?rev=1715427&r1=1715426&r2=1715427&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java
 Fri Nov 20 21:48:11 2015
@@ -91,8 +91,9 @@ public interface IRmPersistence
      * @param id The node name
      * @param jobid The duccid of the job owning the new shoare
      * @param shareid The DuccId of the new share.
+     * @param quantum The scheduling quantum in GB used for this share.
      */
-    public void addAssignment(String id, DuccId jobid, DuccId shareid);
+    public void addAssignment(String id, DuccId jobid, IDbShare share, int 
quantum, String jobtype) throws Exception;
 
     /**
      * Remove a share from the machine.
@@ -100,7 +101,54 @@ public interface IRmPersistence
      * @param jobid The duccid of the job owning the new shoare
      * @param shareid The DuccId of the new share.
      */
-    public void removeAssignment(String id, DuccId jobid, DuccId shareid);
+    public void removeAssignment(String id, DuccId jobid, IDbShare share) 
throws Exception;
+
+    /**
+     * Update a share definition to show it is non-preemptable.
+     * @param node The node where the share resides.
+     * @param shareId The (RM-assigned) DuccId of the share.
+     * @param jobId The OR-assigned DuccId of the job.
+     * @param val True if it is non-preemptable, false otherwise.
+     *
+     * NOTE: The triple (node, shareid, jobid) form the primary key to find 
the share in the DB
+     */
+    public void setFixed(String node, DuccId shareId, DuccId jobId, boolean 
val) throws Exception;
+    /**
+     * Update a share definition to show it is non-preemptable.
+     * @param node The node where the share resides.
+     * @param shareId The (RM-assigned) DuccId of the share.
+     * @param jobId The OR-assigned DuccId of the job.
+     * @param val True if it is non-preemptable.  RM will never set this false.
+     *
+     * NOTE: The triple (node, shareid, jobid) form the primary key to find 
the share in the DB
+     */
+
+    public void setPurged(String node, DuccId shareId, DuccId jobId, boolean 
val) throws Exception;
+
+    /**
+     * Update a share definition to show it is evicted aka preempted by RM.
+     * @param node The node where the share resides.
+     * @param shareId The (RM-assigned) DuccId of the share.
+     * @param jobId The OR-assigned DuccId of the job.
+     * @param val True if it is evicted.  Once evicted it can not be 
un-evicted so RM never sets this false.
+     *
+     * NOTE: The triple (node, shareid, jobid) form the primary key to find 
the share in the DB
+     */
+    public void setEvicted(String node, DuccId shareId, DuccId jobId, boolean 
val) throws Exception;
+
+    /**
+     * Update current information about the share from the current OR 
publication.
+     * @param node The node where the share resides.
+     * @param shareId The (RM-assigned) DuccId of the share.
+     * @param jobId The OR-assigned DuccId of the job.
+     * @param investment The "investment", i.e. amount of CPU expended by the 
process in the share so far.
+     * @param state The OR-assigned state of the process in the share.
+     * @param init_time The time the process spent in its initialization phase.
+     * @param pid The *ix process id of the process in the share.
+     *
+     * NOTE: The triple (node, shareid, jobid) form the primary key to find 
the share in the DB
+     */
+    public void updateShare(String node, DuccId shareid, DuccId jobid, long 
investment, String state, long init_time, long pid) throws Exception;
 
     /**
      * Fetch a machine by its id.
@@ -210,4 +258,95 @@ public interface IRmPersistence
         public String  columnName()   { return pname(); }
         public boolean isIndex()      { return false; }
     }
+
+    enum RmShares
+        implements IDbProperty
+    {
+        TABLE_NAME {
+            public String pname() { return "rmshares"; }
+            public Type type()  { return Type.String; }
+            public boolean isPrivate() { return true;}
+            public boolean isMeta() { return true;}
+        },
+        // share uniqueness is given by the rm duccid + jobid.  We add in the 
node to act as cassandra's cluster id
+        // because the principal query at the moment is to find shares on a 
node.
+
+        // the way cassandra works:  'node' will become the cluster key, and 
ducc_dbid, job_id the row key
+        Node {
+            public String pname() { return "node"; }
+            public Type type()  { return Type.String; }
+            public boolean isPrimaryKey() { return true; }
+        },
+        DuccDbid {
+            public String pname() { return "ducc_dbid"; }
+            public Type type()  { return Type.Long; }
+            public boolean isPrimaryKey() { return true; }
+        },
+
+        JobId {
+            public String pname() { return "job_id"; }
+            public Type type()  { return Type.Long; }
+            public boolean isPrimaryKey() { return true; }
+        },
+
+        Uuid {
+            public String pname() { return "uuid"; }
+            public Type type()  { return Type.UUID; }
+        },
+        ShareOrder {
+            public String pname() { return "share_order"; }
+            public Type type()  { return Type.Integer; }
+        },
+        Quantum {
+            public String pname() { return "quantum"; }
+            public Type type()  { return Type.Integer; }
+        },
+        InitTime {
+            public String pname() { return "init_time"; }
+            public Type type()  { return Type.Long; }
+        },
+        Evicted {
+            public String pname() { return "evicted"; }
+            public Type type()  { return Type.Boolean; }
+        },
+        Purged {
+            public String pname() { return "purged"; }
+            public Type type()  { return Type.Boolean; }
+        },
+        Fixed {
+            public String pname() { return "fixed"; }
+            public Type type()  { return Type.Boolean; }
+        },
+        Blacklisted {
+            public String pname() { return "blacklisted"; }
+            public Type type()  { return Type.Boolean; }
+        },
+        State {
+            public String pname() { return "state"; }
+            public Type type()  { return Type.String; }
+        },
+        Pid {
+            public String pname() { return "pid"; }
+            public Type type()  { return Type.Long; }
+        },
+        JobType {
+            public String pname() { return "jobtype"; }
+            public Type type()  { return Type.String; }
+        },
+        Investment {
+            public String pname() { return "investment"; }
+            public Type type()  { return Type.Long; }
+        },
+        ;
+
+        public boolean isPrimaryKey() { return false; }
+        public boolean isPrivate()    { return false; }
+        public boolean isMeta()       { return false; }
+        public String  columnName()   { return pname(); }
+        public boolean isIndex()      { return false; }
+
+    }
+
 }
+
+

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java?rev=1715427&r1=1715426&r2=1715427&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java
 Fri Nov 20 21:48:11 2015
@@ -38,8 +38,12 @@ public class NullRmStatePersistence impl
     public void setNodeProperty(String id, RmNodes key, Object value) { }
     public void setNodeProperties(String id, Object... props) {}
     public void createMachine(String id, Map<RmNodes, Object> props) { }
-    public void addAssignment(String id, DuccId jobid, DuccId shareid) {}
-    public void removeAssignment(String id, DuccId jobid, DuccId shareid) {}
+    public void addAssignment(String id, DuccId jobid, IDbShare shareid, int 
quantum, String type) {}
+    public void removeAssignment(String id, DuccId jobid, IDbShare shareid) {}
+    public void setEvicted(String node, DuccId shareId, DuccId jobId, boolean 
val) {}
+    public void setFixed(String node, DuccId shareId, DuccId jobId, boolean 
val) {}
+    public void setPurged(String node, DuccId shareId, DuccId jobId, boolean 
val) {}
+    public void updateShare(String node, DuccId shareid, DuccId jobid, long 
investment, String state, long init_time, long pid) {}
     public Properties getMachine(String id) { return null; }
     public Map<String, Map<String, Object>> getAllMachines() { return new 
HashMap<String, Map<String, Object>>(); }
 }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java?rev=1715427&r1=1715426&r2=1715427&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java
 Fri Nov 20 21:48:11 2015
@@ -133,10 +133,10 @@ public class DbCreate
             return false;
         }
         Metadata metadata = cluster.getMetadata();
-        doLog(methodName, "Connected to cluster: %s\n", 
metadata.getClusterName());
+        logger.info(methodName, null, "Connected to cluster:", 
metadata.getClusterName());
         
         for ( Host host : metadata.getAllHosts() ) {
-            doLog(methodName, "Datatacenter: %s; Host: %s; Rack: %s\n", 
host.getDatacenter(), host.getAddress(), host.getRack());
+            logger.info(methodName, null, "Datatacenter:", 
host.getDatacenter(), "Host:", host.getAddress(), "Rack:", host.getRack());
         } 
         session = cluster.connect();
         return true;

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java?rev=1715427&r1=1715426&r2=1715427&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java
 Fri Nov 20 21:48:11 2015
@@ -65,6 +65,21 @@ public class DbHandle
         return manager.execute(s);
     }
 
+    ResultSet execute(PreparedStatement ps, Object ... fields)
+        throws Exception
+    {
+        String methodName = "execute";        
+        long now = System.currentTimeMillis();
+        
+        try {
+                       BoundStatement boundStatement = new BoundStatement(ps);
+                       BoundStatement bound = boundStatement.bind(fields);
+                       return execute(bound);        
+        } finally {
+                       logger.info(methodName, null, "Time to execute prepared 
statement:", ps.getQueryString(), System.currentTimeMillis() - now);
+               }
+    }
+
     /**
      * Delete the object of the indicated type and duccid.   We optionally 
commit in case we want to
      * do more things that have to work under the same transaction so we can 
rollback if needed.=

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java?rev=1715427&r1=1715426&r2=1715427&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java
 Fri Nov 20 21:48:11 2015
@@ -100,10 +100,10 @@ public class DbManager
             .build();
 
         Metadata metadata = cluster.getMetadata();
-        logger.info(methodName, null, "Connected to cluster: %s\n", 
metadata.getClusterName());
+        logger.info(methodName, null, "Connected to cluster:", 
metadata.getClusterName());
         
         for ( Host host : metadata.getAllHosts() ) {
-            logger.info(methodName, null, "Datatacenter: %s; Host: %s; Rack: 
%s\n", host.getDatacenter(), host.getAddress(), host.getRack());
+            logger.info(methodName, null, "Datatacenter:", 
host.getDatacenter(), "Host:", host.getAddress(), "Rack:", host.getRack());
         } 
     }
 

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java?rev=1715427&r1=1715426&r2=1715427&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
 Fri Nov 20 21:48:11 2015
@@ -25,10 +25,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.uima.ducc.common.persistence.rm.IDbShare;
 import org.apache.uima.ducc.common.persistence.rm.IRmPersistence;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.id.DuccId;
 
+import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.SimpleStatement;
@@ -44,7 +46,15 @@ public class RmStatePersistence
 
     DbManager dbManager = null;
     DuccLogger logger = null;
-    static final String RM_NODE_TABLE = RmNodes.TABLE_NAME.pname();
+    static final String RM_NODE_TABLE  = RmNodes.TABLE_NAME.pname();
+    static final String RM_SHARE_TABLE = RmShares.TABLE_NAME.pname();
+
+    PreparedStatement shareAddPrepare = null;
+    PreparedStatement shareDelPrepare = null;
+    PreparedStatement updateFixedPrepare = null;
+    PreparedStatement updatePurgedPrepare = null;
+    PreparedStatement updateEvictedPrepare = null;
+    PreparedStatement updateSharePrepare = null;
 
     public RmStatePersistence()
     {
@@ -79,11 +89,21 @@ public class RmStatePersistence
        this.logger = logger;
         String stateUrl = System.getProperty("ducc.state.database.url");
         init(stateUrl);
+        DbHandle h = dbManager.open();
+
+        // For creating a new share
+        // These are upserts - sometimes the shares are updated before they're 
actually added to the DB.
+        shareAddPrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET  
uuid=?, share_order=?, blacklisted=?, evicted=?, fixed=?, purged=?, quantum=?, 
jobtype=? WHERE node=? AND ducc_dbid=? and job_id=?");
+        shareDelPrepare = h.prepare("DELETE FROM " + RM_SHARE_TABLE + " WHERE 
node = ? and ducc_dbid = ? and job_id = ?;");
+        updateFixedPrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET 
fixed = ? WHERE node = ? AND ducc_dbid = ? and job_id = ?");
+        updatePurgedPrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET 
purged = ? WHERE node = ? AND ducc_dbid = ? and job_id = ?");
+        updateEvictedPrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET 
evicted = ? WHERE node = ? AND ducc_dbid = ? and job_id = ?");
+        updateSharePrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET 
investment = ?, state = ?, init_time = ?, pid = ?  WHERE node = ? AND ducc_dbid 
= ? and job_id = ?");
     }
 
     public void close()
     {
-        dbManager.shutdown();
+        if ( dbManager != null ) dbManager.shutdown();
         dbManager = null;
     }
 
@@ -95,6 +115,7 @@ public class RmStatePersistence
         try {
             h = dbManager.open();
             h.execute("TRUNCATE " + RM_NODE_TABLE);
+            h.execute("TRUNCATE " + RM_SHARE_TABLE);
         } catch ( Exception e ) {
             logger.error(methodName, null, "Cannot clear the database.", e);
         } 
@@ -113,6 +134,16 @@ public class RmStatePersistence
         for ( String s : indexes ) {
             ret.add(new SimpleStatement(s));
         }
+
+        buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + RM_SHARE_TABLE 
+ " (");
+        buf.append(DbUtil.mkSchema(RmShares.values()));
+        buf.append(")");
+        ret.add(new SimpleStatement(buf.toString()));
+        indexes = DbUtil.mkIndices(RmShares.values(), RM_SHARE_TABLE);
+        for ( String s : indexes ) {
+            ret.add(new SimpleStatement(s));
+        }
+
         return ret;
     }
 
@@ -167,12 +198,46 @@ public class RmStatePersistence
         
     }
 
-    public void addAssignment(String node, DuccId jobid, DuccId shareid)
+    public void addAssignment(String node, DuccId jobid, IDbShare s, int 
quantum, String type)
+       throws Exception
     {
+        DbHandle h = dbManager.open();
+        h.saveObject(shareAddPrepare, s.getId().getUUID(), s.getShareOrder(), 
s.isBlacklisted(), s.isEvicted(), s.isFixed(), s.isPurged(), quantum, type, 
node, s.getId().getFriendly(), jobid.getFriendly() ); 
     }
 
-    public void removeAssignment(String node, DuccId jobid, DuccId shareid)
+    public void removeAssignment(String node, DuccId jobid, IDbShare s)
+       throws Exception
+    {
+       DbHandle h = dbManager.open();
+        h.execute(shareDelPrepare, node, s.getId().getFriendly(), 
jobid.getFriendly());
+    }
+
+    public void setFixed(String node, DuccId shareId, DuccId jobId, boolean 
val) 
+        throws Exception
+    {
+       DbHandle h = dbManager.open();
+        h.execute(updateFixedPrepare, val, node, shareId.getFriendly(), 
jobId.getFriendly());
+    }
+
+    public void setPurged(String node, DuccId shareId, DuccId jobId, boolean 
val) 
+        throws Exception
+    {
+       DbHandle h = dbManager.open();
+        h.execute(updatePurgedPrepare, val, node, shareId.getFriendly(), 
jobId.getFriendly());
+    }
+
+    public void setEvicted(String node, DuccId shareId, DuccId jobId, boolean 
val) 
+        throws Exception
+    {
+       DbHandle h = dbManager.open();
+        h.execute(updateEvictedPrepare, val, node, shareId.getFriendly(), 
jobId.getFriendly());
+    }
+
+    public void updateShare(String node, DuccId shareid, DuccId jobid, long 
investment, String state, long init_time, long pid) 
+        throws Exception
     {
+       DbHandle h = dbManager.open();
+        h.execute(updateSharePrepare, investment, state, init_time, pid, node, 
shareid.getFriendly(), jobid.getFriendly());
     }
 
     public Properties getMachine(String m)

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java?rev=1715427&r1=1715426&r2=1715427&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
 Fri Nov 20 21:48:11 2015
@@ -719,6 +719,7 @@ public class JobManagerConverter
             // we can crash or at least complain loudly on mismatch.
 
             Share s = scheduler.getShare(p.getDuccId());
+
             long mem = p.getResidentMemory();
             long investment = p.getWiMillisInvestment();
             ProcessState state = p.getProcessState();
@@ -957,9 +958,20 @@ public class JobManagerConverter
                       break;
                   case Service:
                   case Pop:
+                      // This is really an AP and OR sets the state to running 
immediately although it isn't yet, so the
+                      // information is incomplete.  We always have to 
reconcile.
+                      if  ( ((IDuccWorkService)l).getServiceDeploymentType() 
== ServiceDeploymentType.other )  {
+                          logger.info(methodName, l.getDuccId(), "[P] State: 
", r.getStateObject(), "->", l.getStateObject());
+                          reconcileProcesses(l.getDuccId(), l, r);
+                      } else  if ( r.getStateObject() != l.getStateObject() ) {
+                          // Service state does come int correctly
+                          logger.info(methodName, l.getDuccId(), "[S] State: 
", r.getStateObject(), "->", l.getStateObject());
+                          reconcileProcesses(l.getDuccId(), l, r);
+                      }
+                      break;
                   case Reservation:
                       if ( r.getStateObject() != l.getStateObject() ) {
-                          logger.info(methodName, l.getDuccId(), "[SPR] State: 
", r.getStateObject(), "->", l.getStateObject());
+                          logger.info(methodName, l.getDuccId(), "[R] State: 
", r.getStateObject(), "->", l.getStateObject());
                       }
                       // for the moment, these guys have nothing to reconcile.
                       break;

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java?rev=1715427&r1=1715426&r2=1715427&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/ResourceManagerComponent.java
 Fri Nov 20 21:48:11 2015
@@ -127,6 +127,7 @@ public class ResourceManagerComponent
         }
     }
 
+    
     class RmAdminEventProcessor implements Processor 
     {
         final AbstractDuccComponent delegate;
@@ -258,6 +259,15 @@ public class ResourceManagerComponent
         return null;
     }
 
+    public void stop()
+       throws Exception
+    {
+       String methodName = "stop";
+        logger.info(methodName, null, "Stopping RM database connection");
+        scheduler.stop();
+        super.stop();
+    }
+
     public void setTransportConfiguration(DuccEventDispatcher eventDispatcher, 
String endpoint)
     {
         this.eventDispatcher = eventDispatcher;

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java?rev=1715427&r1=1715426&r2=1715427&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
 Fri Nov 20 21:48:11 2015
@@ -77,6 +77,7 @@ public interface ISchedulerMain
 
     // once both initialized() and ready() occur, the RM scaffolding will 
enable scheduling by calling start
     void start();
+    void stop();
 
     RmAdminReply varyoff(String[] nodes);
     RmAdminReply varyon(String[] nodes);

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java?rev=1715427&r1=1715426&r2=1715427&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java
 Fri Nov 20 21:48:11 2015
@@ -320,10 +320,10 @@ public class Machine
         try {
             // Not transactional.  If this turns into a problem we'll have to 
find a way
                        persistence.setNodeProperties(id, RmNodes.Assignments, 
activeShares.size(), RmNodes.SharesLeft, shares_left);
-                       persistence.addAssignment(id, s.getJob().getId(), 
s.getId()); // update jobs on machine and specific shares
+                       persistence.addAssignment(id, s.getJob().getId(), s, 
getQuantum(), s.getJob().getShortType()); // update jobs on machine and 
specific shares
             logger.info(methodName, null, "Time to assign share in db", 
System.currentTimeMillis() - now);
                } catch (Exception e) {
-            logger.warn(methodName, null, "Cannot save state; shares_left", 
shares_left);
+            logger.warn(methodName, null, "Cannot save state; shares_left", 
shares_left, e);
                }
 
     }
@@ -339,7 +339,7 @@ public class Machine
         try {
             // Not transactional.  If this turns into a problem we'll have to 
find a way
                        persistence.setNodeProperties(id, RmNodes.Assignments, 
activeShares.size(), RmNodes.SharesLeft, shares_left);
-                       persistence.removeAssignment(id, s.getJob().getId(), 
s.getId());  // update jobs on machine and specific shares
+                       persistence.removeAssignment(id, s.getJob().getId(), 
s);  // update jobs on machine and specific shares
             logger.info(methodName, null, "Time to remove share in db", 
System.currentTimeMillis() - now);
                } catch (Exception e) {
             logger.warn(methodName, null, "Cannot save state; shares_left", 
shares_left);

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1715427&r1=1715426&r2=1715427&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
 Fri Nov 20 21:48:11 2015
@@ -132,6 +132,7 @@ public class Scheduler
     boolean stability = false;
 
     private static DuccIdFactory idFactory;
+    IRmPersistence persistence = null;
 
     // static boolean expandByDoubling = true;
     // static int initializationCap = 2;      // Max allocation until we know 
initialization works in
@@ -264,7 +265,7 @@ public class Scheduler
         logger.info(methodName, null, "                       RM Version       
       : ", ""+ rmversion_major   + "." 
                                                                                
              + rmversion_minor   + "." 
                                                                                
              + rmversion_ptf);
-        IRmPersistence persistence = 
RmPersistenceFactory.getInstance(this.getClass().getName(), "RM");
+        persistence = 
RmPersistenceFactory.getInstance(this.getClass().getName(), "RM");
         persistence.clear();
         initialized = true;
     }
@@ -801,6 +802,11 @@ public class Scheduler
         stability = true;
     }
 
+    public void stop()
+    {
+        persistence.close();
+    }
+
     protected void handleIllNodes()
     {
        String methodName = "handleIllNodes";

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java?rev=1715427&r1=1715426&r2=1715427&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java
 Fri Nov 20 21:48:11 2015
@@ -23,6 +23,10 @@ import java.util.Map;
 
 import org.apache.uima.ducc.common.Node;
 import org.apache.uima.ducc.common.NodeIdentity;
+import org.apache.uima.ducc.common.persistence.rm.IDbShare;
+import org.apache.uima.ducc.common.persistence.rm.IRmPersistence;
+import org.apache.uima.ducc.common.persistence.rm.RmPersistenceFactory;
+import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.id.DuccId;
 import org.apache.uima.ducc.transport.event.common.IProcessState.ProcessState;
 import org.apache.uima.ducc.transport.event.common.ITimeWindow;
@@ -36,9 +40,10 @@ import org.apache.uima.ducc.transport.ev
  * A share is ALWAYS associated with a Machine.
  */
 public class Share
-       implements SchedConstants
+       implements SchedConstants, 
+               IDbShare
 {
-    //private transient DuccLogger logger = DuccLogger.getLogger(Share.class, 
COMPONENT_NAME);
+    private transient DuccLogger logger = DuccLogger.getLogger(Share.class, 
COMPONENT_NAME);
     
     private transient Machine machine;               // machine associatede 
with this share, assigned after "what-of"
     private DuccId id = null;              // unique *within this machine*     
    assigned after "what-of"
@@ -57,6 +62,9 @@ public class Share
 
     private long investment = 0;          // Current time for all ACTIVE work 
items in the process
 
+    // note this returns a global static instance, no need to staticisze it 
here
+    private IRmPersistence persistence = null;
+
      @SuppressWarnings("unused")
        private long resident_memory = 0;
      @SuppressWarnings("unused")
@@ -88,6 +96,7 @@ public class Share
         this.job = job;
         this.bljobid = null;        // UIMA-4142
         this.share_order = share_order;
+        this.persistence = 
RmPersistenceFactory.getInstance(this.getClass().getName(), "RM");
     }
 
     /**
@@ -102,6 +111,7 @@ public class Share
         this.job = null;
         this.bljobid = jobid;       // UIMA-4142
         this.share_order = share_order;
+        this.persistence = 
RmPersistenceFactory.getInstance(this.getClass().getName(), "RM");
     }
 
     /**
@@ -114,6 +124,7 @@ public class Share
         this.job = job;
         this.bljobid = null;        // UIMA-4142
         this.share_order = share_order;
+        this.persistence = 
RmPersistenceFactory.getInstance(this.getClass().getName(), "RM");
     }
 
 //     /**
@@ -265,6 +276,7 @@ public class Share
 
     public boolean update(DuccId jobid, long mem, long investment, 
ProcessState state, ITimeWindow init_time, String pid)
     {
+       String methodName = "update";
         if ( ! jobid.equals(job.getId()) ) return false;      // something has 
gone horribly wrong
         
         this.resident_memory = mem;
@@ -272,6 +284,17 @@ public class Share
         this.state = state;
         this.pid = pid;
         this.init_time = init_time;
+        try {
+            long npid = -1L;               
+            if ( pid != null ) {              // OR sends junk here for a while
+                npid = Long.parseLong(pid);
+            }
+
+                       
persistence.updateShare(getNode().getNodeIdentity().getName(), id, jobid, 
investment, state.toString(), getInitializationTime(), npid);
+               } catch (Exception e) {
+            logger.warn(methodName, job.getId(), "Cannot update share 
statistics in database for share", id, e);
+               }
+        logger.info(methodName, jobid, "UPDATE:", investment, state, 
getInitializationTime(), pid);
         return true;
     }
 
@@ -321,6 +344,9 @@ public class Share
 
     public void setInitializationTime(long millis)
     {
+       String methodName = "setInitializationTme";
+        logger.info(methodName, null, "SET INIT TIME", "shareid", id, millis);
+
         init_time = new TimeWindow();
         init_time.setStartLong(0);
         init_time.setEndLong(millis);
@@ -335,27 +361,45 @@ public class Share
 
     public void setFixed()
     {
+       String methodName = "setFixed";
         fixed = true;
+        try {
+                       
persistence.setFixed(getNode().getNodeIdentity().getName(), id, job.getId(), 
true);
+               } catch (Exception e) {
+            logger.warn(methodName, job.getId(), "Cannot update 'fixed' in 
database for share", id, e);
+               }
     }
 
-    boolean isFixed()
+    public boolean isFixed()
     {
         return fixed;
     }
 
     void evict()
     {
+       String methodName = "evicted";
         evicted = true;
+        try {
+                       
persistence.setEvicted(getNode().getNodeIdentity().getName(), id, job.getId(), 
true);
+               } catch (Exception e) {
+            logger.warn(methodName, job.getId(), "Cannot update 'evicted' in 
database for share", id, e);
+               }
     }
 
-    boolean isEvicted()
+    public boolean isEvicted()
     {
         return evicted || purged;
     }
 
     void purge()
     {
+       String methodName = "purge";
         purged = true;
+        try {
+                       
persistence.setPurged(getNode().getNodeIdentity().getName(), id, job.getId(), 
true);
+               } catch (Exception e) {
+            logger.warn(methodName, job.getId(), "Cannot update 'purge bit' in 
database for share", id, e);
+               }
     }
 
     public boolean isPurged()



Reply via email to