Author: challngr
Date: Tue Dec  1 19:21:52 2015
New Revision: 1717503

URL: http://svn.apache.org/viewvc?rev=1717503&view=rev
Log:
UIMA-4577 Add RmLoad table and support.

Added:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbJob.java
Modified:
    uima/sandbox/uima-ducc/trunk/src/main/resources/cassandra.yaml
    
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java
    
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java

Modified: uima/sandbox/uima-ducc/trunk/src/main/resources/cassandra.yaml
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/resources/cassandra.yaml?rev=1717503&r1=1717502&r2=1717503&view=diff
==============================================================================
--- uima/sandbox/uima-ducc/trunk/src/main/resources/cassandra.yaml (original)
+++ uima/sandbox/uima-ducc/trunk/src/main/resources/cassandra.yaml Tue Dec  1 
19:21:52 2015
@@ -108,12 +108,12 @@ partitioner: org.apache.cassandra.dht.Mu
 # the configured compaction strategy.
 # If not set, the default directory is $CASSANDRA_HOME/data/data.
 data_file_directories:
-    - DUCC/data
+    - ../state/database/data
 
 # commit log.  when running on magnetic HDD, this should be a
 # separate spindle than the data directories.
 # If not set, the default directory is $CASSANDRA_HOME/data/commitlog.
-commitlog_directory: DUCC/commitlog
+commitlog_directory: ../state/database/commitlog
 
 # policy for data disk failures:
 # die: shut down gossip and client transports and kill the JVM for any fs 
errors or
@@ -228,7 +228,7 @@ counter_cache_save_period: 7200
 
 # saved caches
 # If not set, the default directory is $CASSANDRA_HOME/data/saved_caches.
-saved_caches_directory: DUCC/saved_caches
+saved_caches_directory: ../state/database/saved_caches
 
 # commitlog_sync may be either "periodic" or "batch." 
 # 
@@ -274,7 +274,7 @@ seed_provider:
       parameters:
           # seeds is actually a comma-delimited list of addresses.
           # Ex: "<ip1>,<ip2>,<ip3>"
-          - seeds: "DUCC_HEAD"
+          - seeds: "bluej538"
 
 # For workloads with more data than can fit in memory, Cassandra's
 # bottleneck will be reads that need to fetch data from
@@ -387,7 +387,7 @@ ssl_storage_port: 7001
 # you can specify which should be chosen using listen_interface_prefer_ipv6. 
If false the first ipv4
 # address will be used. If true the first ipv6 address will be used. Defaults 
to false preferring
 # ipv4. If there is only one address it will be selected regardless of 
ipv4/ipv6.
-listen_address: DUCC_HEAD
+listen_address: bluej538
 # listen_interface: eth0
 # listen_interface_prefer_ipv6: false
 
@@ -445,7 +445,7 @@ start_rpc: true
 # you can specify which should be chosen using rpc_interface_prefer_ipv6. If 
false the first ipv4
 # address will be used. If true the first ipv6 address will be used. Defaults 
to false preferring
 # ipv4. If there is only one address it will be selected regardless of 
ipv4/ipv6.
-rpc_address: DUCC_HEAD
+rpc_address: bluej538
 # rpc_interface: eth1
 # rpc_interface_prefer_ipv6: false
 

Added: 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbJob.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbJob.java?rev=1717503&view=auto
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbJob.java
 (added)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IDbJob.java
 Tue Dec  1 19:21:52 2015
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+*/
+
+
+package org.apache.uima.ducc.common.persistence.rm;
+
+
+public interface IDbJob
+{
+
+    public String getClassName();
+    public long getFriendlyId();
+    public String getUserName();
+    public int getMemory();
+    public String getShortType();
+    public int queryDemand();
+    public int countOccupancy();
+    public String getState();
+    public int getShareOrder();
+}

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java?rev=1717503&r1=1717502&r2=1717503&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java
 Tue Dec  1 19:21:52 2015
@@ -177,6 +177,21 @@ public interface IRmPersistence
     public Map<String, Map<String, Object>> getAllMachines() throws Exception;
 
     /**
+     * A new job arrives (or is recovered after restart).
+     */
+    public void addJob(IDbJob j) throws Exception;
+
+    /**
+     * A job has left the system forever.
+     */
+    public void deleteJob(IDbJob j) throws Exception;
+
+    /**
+     * How many shares to I want from the scheduler?
+     */
+    public void updateDemand(IDbJob j) throws Exception;
+
+    /**
      * Shutdown the connection to the DB;
      * 
      */
@@ -347,6 +362,67 @@ public interface IRmPersistence
 
     }
 
+    /**
+     * This table lists jobs in the system.
+     */
+    enum RmLoad
+        implements IDbProperty
+    {
+        TABLE_NAME {
+            public String pname() { return "rmload"; }
+            public Type type()  { return Type.String; }
+            public boolean isPrivate() { return true;}
+            public boolean isMeta() { return true;}
+        },
+
+        Class {
+            public String pname() { return "class"; }
+            public Type type()  { return Type.String; }
+        },
+
+        JobId {
+            public String pname() { return "job_id"; }
+            public Type type()  { return Type.Long; }
+            public boolean isPrimaryKey() { return true; }
+        },
+
+        User {
+            public String pname() { return "user"; }
+            public Type type()  { return Type.String; }
+        },
+
+        Memory {
+            public String pname() { return "memory"; }
+            public Type type()  { return Type.Integer; }
+        },
+
+        State {
+            public String pname() { return "state"; }
+            public Type type()  { return Type.String; }
+        },
+
+        Demand {
+            public String pname() { return "demand"; }
+            public Type type()  { return Type.Integer; }
+        },
+
+        Occupancy {
+            public String pname() { return "occupancy"; }
+            public Type type()  { return Type.Integer; }
+        },
+
+        JobType {
+            public String pname() { return "jobtype"; }
+            public Type type()  { return Type.String; }
+        };
+
+        public boolean isPrimaryKey() { return false; }
+        public boolean isPrivate()    { return false; }
+        public boolean isMeta()       { return false; }
+        public String  columnName()   { return pname(); }
+        public boolean isIndex()      { return false; }
+
+    }
 }
 
 

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java?rev=1717503&r1=1717502&r2=1717503&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java
 Tue Dec  1 19:21:52 2015
@@ -25,7 +25,9 @@ import java.util.Properties;
 import org.apache.uima.ducc.common.utils.DuccLogger;
 import org.apache.uima.ducc.common.utils.id.DuccId;
 
-
+/**
+ * This class allows a persistence object to be created even if none is 
configured, without crashes or NPEs.
+ */
 public class NullRmStatePersistence implements IRmPersistence
 {
                                
@@ -46,4 +48,7 @@ public class NullRmStatePersistence impl
     public void updateShare(String node, DuccId shareid, DuccId jobid, long 
investment, String state, long init_time, long pid) {}
     public Properties getMachine(String id) { return null; }
     public Map<String, Map<String, Object>> getAllMachines() { return new 
HashMap<String, Map<String, Object>>(); }
+    public void addJob(IDbJob j ) {}
+    public void deleteJob(IDbJob j ) {}
+    public void updateDemand(IDbJob j) {}
 }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java?rev=1717503&r1=1717502&r2=1717503&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java
 Tue Dec  1 19:21:52 2015
@@ -133,10 +133,10 @@ public class DbCreate
             return false;
         }
         Metadata metadata = cluster.getMetadata();
-        logger.info(methodName, null, "Connected to cluster:", 
metadata.getClusterName());
+        doLog(methodName, "Connected to cluster:", metadata.getClusterName());
         
         for ( Host host : metadata.getAllHosts() ) {
-            logger.info(methodName, null, "Datatacenter:", 
host.getDatacenter(), "Host:", host.getAddress(), "Rack:", host.getRack());
+            doLog(methodName, "Datatacenter:", host.getDatacenter(), "Host:", 
host.getAddress(), "Rack:", host.getRack());
         } 
         session = cluster.connect();
         return true;

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java?rev=1717503&r1=1717502&r2=1717503&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java
 Tue Dec  1 19:21:52 2015
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.uima.ducc.common.persistence.rm.IDbJob;
 import org.apache.uima.ducc.common.persistence.rm.IDbShare;
 import org.apache.uima.ducc.common.persistence.rm.IRmPersistence;
 import org.apache.uima.ducc.common.utils.DuccLogger;
@@ -48,7 +49,11 @@ public class RmStatePersistence
     DuccLogger logger = null;
     static final String RM_NODE_TABLE  = RmNodes.TABLE_NAME.pname();
     static final String RM_SHARE_TABLE = RmShares.TABLE_NAME.pname();
+    static final String RM_LOAD_TABLE  = RmLoad.TABLE_NAME.pname();
 
+    // Prepared statements to manage the RmNodes table
+
+    // Prepared statements to manage the RmShares table
     PreparedStatement shareAddPrepare = null;
     PreparedStatement shareDelPrepare = null;
     PreparedStatement updateFixedPrepare = null;
@@ -56,6 +61,11 @@ public class RmStatePersistence
     PreparedStatement updateEvictedPrepare = null;
     PreparedStatement updateSharePrepare = null;
 
+    // Prepared statements to manage the RmLoad table
+    PreparedStatement addJobPrepare = null;
+    PreparedStatement deleteJobPrepare = null;
+    PreparedStatement updateDemandPrepare = null;
+
     public RmStatePersistence()
     {
     }
@@ -99,6 +109,11 @@ public class RmStatePersistence
         updatePurgedPrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET 
purged = ? WHERE node = ? AND ducc_dbid = ? and job_id = ?");
         updateEvictedPrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET 
evicted = ? WHERE node = ? AND ducc_dbid = ? and job_id = ?");
         updateSharePrepare = h.prepare("UPDATE " + RM_SHARE_TABLE + " SET 
investment = ?, state = ?, init_time = ?, pid = ?  WHERE node = ? AND ducc_dbid 
= ? and job_id = ?");
+
+        // An upsert
+        addJobPrepare = h.prepare("UPDATE " + RM_LOAD_TABLE + " SET class = ?, 
user = ?, memory = ?, jobtype = ? WHERE job_id = ?");
+        deleteJobPrepare = h.prepare("DELETE FROM " + RM_LOAD_TABLE + " WHERE 
job_id=?");
+        updateDemandPrepare = h.prepare("UPDATE " + RM_LOAD_TABLE + " SET 
demand = ?, occupancy = ?, state = ? WHERE job_id=?");
     }
 
     public void close()
@@ -116,6 +131,7 @@ public class RmStatePersistence
             h = dbManager.open();
             h.execute("TRUNCATE " + RM_NODE_TABLE);
             h.execute("TRUNCATE " + RM_SHARE_TABLE);
+            h.execute("TRUNCATE " + RM_LOAD_TABLE);
         } catch ( Exception e ) {
             logger.error(methodName, null, "Cannot clear the database.", e);
         } 
@@ -144,6 +160,15 @@ public class RmStatePersistence
             ret.add(new SimpleStatement(s));
         }
 
+        buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + RM_LOAD_TABLE + 
" (");
+        buf.append(DbUtil.mkSchema(RmLoad.values()));
+        buf.append(")");
+        ret.add(new SimpleStatement(buf.toString()));
+        indexes = DbUtil.mkIndices(RmShares.values(), RM_SHARE_TABLE);
+        for ( String s : indexes ) {
+            ret.add(new SimpleStatement(s));
+        }
+
         return ret;
     }
 
@@ -260,7 +285,30 @@ public class RmStatePersistence
         }
         return ret;
    }
-    
+
+    public void addJob(IDbJob j) 
+        throws Exception
+    {
+       DbHandle h = dbManager.open();
+        h.execute(addJobPrepare, j.getClassName(), j.getUserName(), 
j.getMemory(), j.getShortType(), j.getFriendlyId());
+    }
+
+    public void deleteJob(IDbJob j) 
+        throws Exception
+    {
+       DbHandle h = dbManager.open();
+        h.execute(deleteJobPrepare, j.getFriendlyId());        
+    }
+
+    public void updateDemand(IDbJob j)
+       throws Exception
+    {
+       DbHandle h = dbManager.open();
+        // queryDemand returns the number of processes wanted by the job, of 
the job's memory size
+        // The occupancy is converted from qshares to nshares (processes) for 
the db.
+        h.execute(updateDemandPrepare, j.queryDemand(), (j.countOccupancy() / 
j.getShareOrder()), j.getState(), j.getFriendlyId());
+    }
+
     public static void main(String[] args)
     {
     }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java?rev=1717503&r1=1717502&r2=1717503&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/JobManagerConverter.java
 Tue Dec  1 19:21:52 2015
@@ -951,6 +951,7 @@ public class JobManagerConverter
 
                 localMap.addDuccWork(l);           // still schedulable, and 
we already know about it, just sync the state
 
+                scheduler.signalState(l.getDuccId(), 
l.getStateObject().toString());
                 switch ( l.getDuccType() ) {
                   case Job:    
                       jobUpdate(r.getStateObject(), l);

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java?rev=1717503&r1=1717502&r2=1717503&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/IRmJob.java
 Tue Dec  1 19:21:52 2015
@@ -21,6 +21,7 @@ package org.apache.uima.ducc.rm.schedule
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.uima.ducc.common.persistence.rm.IDbJob;
 import org.apache.uima.ducc.common.utils.id.DuccId;
 import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
 
@@ -31,7 +32,8 @@ import org.apache.uima.ducc.transport.ev
 
 public interface IRmJob
        extends SchedConstants,
-            IEntity
+            IEntity,
+            IDbJob
 {
     
     /**
@@ -41,9 +43,9 @@ public interface IRmJob
 
     public DuccId getId();
     
-    public String getShortType();  // S, R, M, J - service reservation 
managed-reservation, job
+    // public String getShortType();  IDbJob UIMA-4577 // S, R, M, J - service 
reservation managed-reservation, job
 
-    public long getFriendlyId();
+    // public long getFriendlyId();    UIMA 4577 
 
     public String getName();
     public void setJobName(String name);
@@ -74,7 +76,9 @@ public interface IRmJob
     public boolean isReservation();       // ask ...
 
     public boolean setInitWait(boolean w);   // When set, job cap is set low, 
waiting for confirmation that init is ok.
-                                             // Returns the prev state.
+                                             // Returns the prev state
+
+    public void setState(String state);     // UIMA-4577 Information only, for 
the db. getState() is in IDbJob;
 
     /**
      * Used during scheduling cycle only, keep track of number of shares given 
out to this job.
@@ -88,7 +92,7 @@ public interface IRmJob
     /**
      * For queries - how many processes do I want in a perfect world?
      */
-    public int queryDemand();
+    // public int queryDemand(); to IDbJob UIMA-4577
 
     /**
      * Eviction policies, configurable.
@@ -220,7 +224,7 @@ public interface IRmJob
      * Scheduler looks at job memory and decides what its share order is.
      */
     public void setShareOrder(int s);
-    public int getShareOrder();
+    public int getShareOrder();         // IDbJob UIMA-4577
 
     /**
      * This returns the largest number that can actually be used, which will 
be either the
@@ -230,7 +234,7 @@ public interface IRmJob
     public void initJobCap();   // calculate the cap at start of cycle and 
cache it
                                 // because it is frequently used
 
-    public String getUserName();
+    // public String getUserName();      // UIMA 4577 IDbJob
     public void   setUserName(String n);
     
     public User getUser();
@@ -242,7 +246,7 @@ public interface IRmJob
     public int  getUserPriority();
     public void setUserPriority(int p);
 
-    public String getClassName();
+    // public String getClassName();         UIMA 4577 IDbJob
     public void   setClassName(String n);
 
     public int getSchedulingPriority();
@@ -257,7 +261,7 @@ public interface IRmJob
     public int  nThreads();
     public void setThreads(int threads);
 
-    public int  getMemory();
+    // public int  getMemory();     UIMA 4577 IDbJob
     public void setMemory(int memory);
 
     /**
@@ -283,7 +287,7 @@ public interface IRmJob
     
     // Total number of shares to account to me - either actually assigned, or
     // counted afresh in the current scheduling cycle, for allotments
-    public int countOccupancy();                  // UIMA-4275
+    // public int countOccupancy();                  // UIMA-4275 moved to 
IDbJob by UIMA-4577
 
     // UIMA-4275 Must lose some number of shares unconditionally
     public void shrinkBy(int howmany);

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java?rev=1717503&r1=1717502&r2=1717503&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ISchedulerMain.java
 Tue Dec  1 19:21:52 2015
@@ -50,6 +50,7 @@ public interface ISchedulerMain
     void signalCompletion(DuccId id);
     void signalInitialized(IRmJob id);
     void signalCompletion(IRmJob job, Share share);
+    void signalState(DuccId jobid, String state);
     //void signalGrowth(DuccId jobid, Share share);
 
     String getDefaultFairShareName();

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java?rev=1717503&r1=1717502&r2=1717503&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/ResourceClass.java
 Tue Dec  1 19:21:52 2015
@@ -573,7 +573,7 @@ public class ResourceClass
     {
         int sum = 0;
         for ( IRmJob j : allJobs.values() ) {
-            sum += (j.countOccupancy() * j.getShareOrder());          // in 
quantum shares UIMA-4275
+            sum += (j.countOccupancy());          // in quantum shares 
UIMA-4275
         }
         return sum;
     }

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java?rev=1717503&r1=1717502&r2=1717503&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/RmJob.java
 Tue Dec  1 19:21:52 2015
@@ -45,6 +45,7 @@ public class RmJob
        
     protected DuccId   id;                            // sched-assigned id 
(maybe delegate to job manager eventually)
     protected DuccType ducc_type;                     // for messages so we 
can tell what kind of job
+    protected String state = "New";                   // UIMA-4577 info only, 
for the db
     protected boolean  arbitrary_process = false;     // Is this an AP?
     protected String name;                            // user's name for job
     protected String resource_class_name;             // Name of the res 
class, from incoming job parms
@@ -124,6 +125,7 @@ public class RmJob
         orchestrator_epoch = 
SystemPropertyResolver.getIntProperty("ducc.orchestrator.state.publish.rate", 
10000);
         rm_rate            = 
SystemPropertyResolver.getIntProperty("ducc.rm.state.publish.ratio", 4);
         ducc_epoch         = orchestrator_epoch * rm_rate;
+        
     }
     
     // public RmJob(DuccId id, Properties properties)
@@ -169,7 +171,10 @@ public class RmJob
     {
         this.name = name;
     }
-    
+
+    public void setState(String state) { this.state = state; }
+    public String getState()           { return this.state; }
+
     public void setReservation()
     {
         this.is_reservation = true;

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java?rev=1717503&r1=1717502&r2=1717503&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Scheduler.java
 Tue Dec  1 19:21:52 2015
@@ -1051,6 +1051,11 @@ public class Scheduler
 
                 prclass.addJob(j);
                 j.setResourceClass(prclass);
+                try {
+                                       persistence.addJob(j);
+                               } catch (Exception e) {
+                                       logger.warn(methodName, j.getId(), 
"Cannot persist new job in database:", e);                                   
+                               }
                 logger.info(methodName, j.getId(), "submit", j.toString());
             }
 
@@ -1060,6 +1065,14 @@ public class Scheduler
                 schedulers[i].schedule(upd);
             }
 
+            for ( IRmJob j : allJobs.values() ) {       // UIMA-4577 persist 
'demand'
+                try {
+                                       persistence.updateDemand(j);
+                               } catch (Exception e) {
+                                       logger.warn(methodName, j.getId(), 
"Cannot update demand in database:", e);
+                               }
+            }
+
             logger.info(methodName, null, "--------------- Scheduler returns 
---------------");
             logger.info(methodName, null, "\n", upd.toString());
             logger.info(methodName, null, 
"------------------------------------------------");                
@@ -1434,6 +1447,14 @@ public class Scheduler
         return ret;
     }
 
+    public synchronized void signalState(DuccId jobid, String state)
+    {
+        IRmJob j = allJobs.get(jobid);
+        if ( j != null ) {                // might not be here yet, we'll get 
it later
+            j.setState(state);
+        }
+    }
+
     /**
      * Callback from job manager, need shares for a new fair-share job.
      */
@@ -1526,6 +1547,12 @@ public class Scheduler
         String methodName = "processCompletion";
         logger.info(methodName, job.getId(), "Job completes.");
 
+        try {
+                       persistence.deleteJob(job);     // UIMA-4577
+               } catch (Exception e) {
+                       logger.warn(methodName, job.getId(), "Cannot delete job 
from database:", e);
+               }
+
         // -- clean up the running jobs list
         IRmJob j = allJobs.remove(job.getId());
         if ( j == null ) {
@@ -1683,6 +1710,11 @@ public class Scheduler
         logger.info(methodName, j.getId(), "Recovered job:", j.toString());
         logger.info(methodName, j.getId(), "Recovered shares:", 
sharenames.toString());
 
+        try {
+                       persistence.addJob(j);
+               } catch (Exception e) {
+                       logger.warn(methodName, j.getId(), "Cannot persist 
recovered job in database:", j);
+               }
         // After a reconfig/restart the share may be in the wrong place, in 
which case it
         // needs to be removed.  We have to wait until it is fully hooked into 
the structures
         // before scheduling for removal because it could take a while to go 
away and


Reply via email to