Author: challngr Date: Thu Sep 17 20:34:08 2015 New Revision: 1703684 URL: http://svn.apache.org/viewvc?rev=1703684&view=rev Log: UIMA-4577 Scripting fixes. Fix leak in loader and speed it up. Add indexes. Minor tupos. Go to ODB 2.1.2 to pick up indexing fix.
Modified: uima/sandbox/uima-ducc/trunk/src/main/admin/db_console uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java Modified: uima/sandbox/uima-ducc/trunk/src/main/admin/db_console URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/db_console?rev=1703684&r1=1703683&r2=1703684&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/src/main/admin/db_console (original) +++ uima/sandbox/uima-ducc/trunk/src/main/admin/db_console Thu Sep 17 20:34:08 2015 @@ -38,8 +38,11 @@ class DbConsole(DuccUtil): print "Note that Python must be at least version 2.6 but not 3.x. You are running version", sys.version_info return + if ( self.db_parms == '--disabled--' ): + print "Database is disabled." + return - (jvm_parms, classpath, db_rt, dburl, dbrest, dbroot) = self.db_parms() + (jvm_parms, classpath, db_rt, dburl, dbrest, dbroot) = self.db_parms main = 'com.orientechnologies.orient.graph.console.OGremlinConsole' Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java?rev=1703684&r1=1703683&r2=1703684&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java Thu Sep 17 20:34:08 2015 @@ -31,6 +31,10 @@ public interface IStateServices { public static String svc_reg_dir = IDuccEnv.DUCC_STATE_SVCREG_DIR; public static String svc_hist_dir = IDuccEnv.DUCC_HISTORY_SVCREG_DIR; + + public static final String archive_key = "is_archived"; + public static final String archive_flag = "true"; + public static final String svc = "svc"; public static final String meta = "meta"; Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java?rev=1703684&r1=1703683&r2=1703684&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbConstants.java Thu Sep 17 20:34:08 2015 @@ -38,6 +38,16 @@ public interface DbConstants ; } + // Every vertex must inherit from here so we can use common indexes + public enum DuccVertexBase + implements Schema + { + VBase { + public String pname() { return "VDuccBase"; } + }, + ; + } + public enum DbVertex implements Schema { @@ -88,15 +98,25 @@ public interface DbConstants } + public enum DuccEdgeBase + implements Schema + { + EdgeBase { + public String pname() { return "ducc_ebase"; } + }, + ; + } + public enum DbEdge implements Schema { // // The convention is for edges to start with lower e and then a lower // - Edge { // Generic edge - public String pname() { return "ducc_edge"; } - }, + // Edge { // Generic edge + // public String pname() { return "ducc_edge"; } + // }, + Classpath { // All record types, detached classpath public String pname() { return "eclasspath"; } }, Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java?rev=1703684&r1=1703683&r2=1703684&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java Thu Sep 17 20:34:08 2015 @@ -21,10 +21,12 @@ package org.apache.uima.ducc.database; import org.apache.uima.ducc.database.DbConstants.DbEdge; import org.apache.uima.ducc.database.DbConstants.DbVertex; +import org.apache.uima.ducc.database.DbConstants.DuccVertexBase; import com.orientechnologies.orient.client.remote.OServerAdmin; import com.orientechnologies.orient.core.metadata.schema.OProperty; import com.orientechnologies.orient.core.metadata.schema.OType; +import com.orientechnologies.orient.core.sql.OCommandSQL; import com.tinkerpop.blueprints.impls.orient.OrientEdgeType; import com.tinkerpop.blueprints.impls.orient.OrientGraphFactory; import com.tinkerpop.blueprints.impls.orient.OrientGraphNoTx; @@ -33,6 +35,8 @@ import com.tinkerpop.blueprints.impls.or public class DbCreate { String dburl; + String adminid = "root"; + String adminpw = null; OServerAdmin admin; OrientGraphFactory factory; @@ -41,6 +45,13 @@ public class DbCreate this.dburl = dburl; } + public DbCreate(String dburl, String adminid, String adminpw) + { + this.dburl = dburl; + this.adminid = adminid; + this.adminpw = adminpw; + } + void createEdgeType(OrientGraphNoTx g, DbEdge id) { String s = id.pname(); @@ -57,16 +68,35 @@ public class DbCreate OrientVertexType e = g.getVertexType(s); if ( e == null ) { System.out.println("Create vertex " + s); - e = g.createVertexType(s); - OProperty p = e.createProperty(DbConstants.DUCCID, OType.LONG); - p.setMandatory(true); + e = g.createVertexType(s, DuccVertexBase.VBase.pname()); } } void createSchema() { + String methodName = "createSchema"; OrientGraphNoTx g = factory.getNoTx(); + String base = DuccVertexBase.VBase.pname(); + OrientVertexType e = g.getVertexType(base); + if ( e == null ) { + System.out.println("Create base vertex class " + base); + e = g.createVertexType(base); + OProperty p = e.createProperty(DbConstants.DUCCID, OType.LONG); + p.setMandatory(true); + OProperty p2 = e.createProperty(DbConstants.DUCC_DBCAT, OType.STRING); + p2.setMandatory(true); + + String sql = "create index i_ducc_dbid on " + base + "(" + DbConstants.DUCCID + ") notunique"; + g.command(new OCommandSQL(sql)).execute(); + System.out.println("(sql)Created index i_ducc_dbid on class " + base + " for " + DbConstants.DUCCID); + + sql = "create index i_ducc_dbcat on " + base + "(" + DbConstants.DUCC_DBCAT + ") notunique"; + g.command(new OCommandSQL(sql)).execute(); + System.out.println("(sql)Created index i_ducc_dbcat on class " + base + " for " + DbConstants.DUCC_DBCAT); + + } + for ( DbVertex o : DbVertex.values() ) { createVertexType(g, o); } @@ -77,6 +107,22 @@ public class DbCreate g.shutdown(); } + boolean createPlocalDatabase() + throws Exception + { + boolean ret = false; + try { + factory = new OrientGraphFactory(dburl, "admin", "admin"); + createSchema(); + ret = true; + } catch ( Exception e ) { + e.printStackTrace(); + } finally { + factory.close(); + } + return ret; + } + /** * Create the database and initialize the schema. This is intended to be called only from Main at * system startup, to insure all users of the db have a db when they start. @@ -84,11 +130,13 @@ public class DbCreate boolean createDatabase() throws Exception { - String pw = DbManager.dbPassword(); + if ( adminpw == null ) { + adminpw = DbManager.dbPassword(); + } try { admin = new OServerAdmin(dburl); - admin.connect("root", pw); // connect to the server + admin.connect(adminid, adminpw); // connect to the server if ( ! admin.existsDatabase("plocal") ) { System.out.println("Database " + dburl + " does not exist, attempting to create it."); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java?rev=1703684&r1=1703683&r2=1703684&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java Thu Sep 17 20:34:08 2015 @@ -429,10 +429,8 @@ public class DbHandle OrientVertex ov = null; // logger.info(methodName, null, duccid, "Create new db record of type", typeName); - ov = graphDb.addVertex("class:" + typeName); - ov.setProperties(DbConstants.DUCCID, duccid); + ov = graphDb.addVertex("class:" + typeName, DbConstants.DUCCID, duccid, DbConstants.DUCC_DBCAT, dbcat.pname()); ov.setProperties(props); - ov.setProperty(DbConstants.DUCC_DBCAT, dbcat.pname()); return ov; } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java?rev=1703684&r1=1703683&r2=1703684&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java Thu Sep 17 20:34:08 2015 @@ -4,20 +4,27 @@ import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.io.ObjectInputStream; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.uima.ducc.common.Pair; import org.apache.uima.ducc.common.persistence.services.IStateServices; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.id.DuccId; +import org.apache.uima.ducc.database.DbConstants.DbCategory; +import org.apache.uima.ducc.transport.event.common.DuccWorkMap; import org.apache.uima.ducc.transport.event.common.IDuccWorkJob; import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation; import org.apache.uima.ducc.transport.event.common.IDuccWorkService; +import com.orientechnologies.orient.core.config.OGlobalConfiguration; + /** * Toy orientdb loader to load a historydb from ducc history */ @@ -30,9 +37,10 @@ public class DbLoader StateServicesDb ssd = null; // String history_url = "remote:localhost/DuccHistory"; - String state_url = "remote:localhost/DuccState"; + // String state_url = "plocal:/home/challngr/ducc_runtime_db/database/databases/DuccHistoryT"; + String state_url = "plocal:/users/challngr/DuccHistoryT"; - // String jobHistory = System.getProperty("user.home") + "/ducc_runtime/history/jobs"; + // String jobHistory = System.getProperty("user.home") + "/ducc_runtime_db/history/jobs"; String jobHistory = "/home/ducc/ducc_runtime/history/jobs"; // String reservationHistory = System.getProperty("user.home") + "/ducc_runtime/history/reservations"; @@ -47,6 +55,10 @@ public class DbLoader //String serviceRegistry = System.getProperty("user.home") + "/ducc_runtime/state/services"; String serviceRegistry = "/home/ducc/ducc_runtime/state/services"; + String checkpointFile = "/home/ducc/ducc_runtime/state/orchestrator.ckpt"; + String archive_key = IStateServices.archive_key; + String archive_flag = IStateServices.archive_flag; + int nthreads = 40; AtomicInteger counter = new AtomicInteger(0); @@ -65,16 +77,17 @@ public class DbLoader public void loadJobs() { String methodName = "loadJobs"; - LinkedBlockingQueue<File> jobqueue = new LinkedBlockingQueue<File>(); + LinkedBlockingQueue<File> queue = new LinkedBlockingQueue<File>(); - int max_to_load = Integer.MAX_VALUE; // or Integer.MAX_VALUE for 'all of them' + // int max_to_load = Integer.MAX_VALUE; // or Integer.MAX_VALUE for 'all of them' + int max_to_load = 1000; // or Integer.MAX_VALUE for 'all of them' int nth = Math.min(nthreads, max_to_load); JobLoader[] loader = new JobLoader[nth]; Thread[] threads = new Thread[nth]; List<Long> ids = new ArrayList<Long>(); for ( int i = 0; i < nth; i++ ) { - loader[i] = new JobLoader(jobqueue, ids); + loader[i] = new JobLoader(queue, ids); threads[i] = new Thread(loader[i]); threads[i].start(); } @@ -88,7 +101,7 @@ public class DbLoader String s = f.toString(); if ( s.endsWith(".dwj") ) { logger.info(methodName, null, "Loading file", c++, ":", f); - jobqueue.offer(f); + queue.offer(f); counter.getAndIncrement(); if ( c >= max_to_load ) break; @@ -120,51 +133,35 @@ public class DbLoader public void loadReservations() { String methodName = "loadReservations"; - - LinkedBlockingQueue<IDuccWorkReservation> queue = new LinkedBlockingQueue<IDuccWorkReservation>(); + LinkedBlockingQueue<File> queue = new LinkedBlockingQueue<File>(); - int max_to_load = Integer.MAX_VALUE; + //int max_to_load = Integer.MAX_VALUE; + int max_to_load = 1000; int nth = Math.min(nthreads, max_to_load); ReservationLoader[] loader = new ReservationLoader[nth]; Thread[] threads = new Thread[nth]; ArrayList<Long> ids = new ArrayList<Long>(); - + for ( int i = 0; i < nth; i++ ) { loader[i] = new ReservationLoader(queue, ids); threads[i] = new Thread(loader[i]); threads[i].start(); } - + File dir = new File(reservationHistory); - int c = 0; - File[] files = dir.listFiles(); logger.info(methodName, null, "Reading", files.length, "reservation instances."); - for ( File f : dir.listFiles() ) { + + int c = 0; + for ( File f : files ) { String s = f.toString(); if ( s.endsWith(".dwr") ) { logger.info(methodName, null, "Loading file", c++, ":", f); - IDuccWorkReservation res = null; - FileInputStream fis = null; - ObjectInputStream in = null; + + queue.offer(f); + counter.getAndIncrement(); - try { - long now = System.currentTimeMillis(); - fis = new FileInputStream(f); - in = new ObjectInputStream(fis); - res = (IDuccWorkReservation) in.readObject(); - logger.info(methodName, res.getDuccId(), "Time to read reservation:", System.currentTimeMillis() - now); - - queue.offer(res); - counter.getAndIncrement(); - } catch(Exception e) { - logger.info(methodName, null, e); - } finally { - // restoreJob(job.getDuccId().getFriendly()); - closeStream(in); - closeStream(fis); - if ( c >= max_to_load ) break; - } + if ( c >= max_to_load ) break; } else { logger.info(methodName, null, "Can't find history file", f); } @@ -188,23 +185,16 @@ public class DbLoader logger.info(methodName, null, "Joining thread (reservations).", i); try { threads[i].join(); } catch ( InterruptedException e ) {} } - - // try { - // List<IDuccWorkReservation> ress = hmd.restoreReservations(c); - // logger.info(methodName, null, "Recovered", ress.size(), "reservations."); - // } catch (Exception e) { - // // TODO Auto-generated catch block - // e.printStackTrace(); - // } } public void loadServices() { String methodName = "loadServices"; - LinkedBlockingQueue<IDuccWorkService> queue = new LinkedBlockingQueue<IDuccWorkService>(); + LinkedBlockingQueue<File> queue = new LinkedBlockingQueue<File>(); - int max_to_load = Integer.MAX_VALUE; + // int max_to_load = Integer.MAX_VALUE; + int max_to_load = 1000; int nth = Math.min(nthreads, max_to_load); ServiceLoader[] loader = new ServiceLoader[nth]; Thread[] threads = new Thread[nth]; @@ -217,36 +207,19 @@ public class DbLoader } File dir = new File(serviceHistory); - int c = 0; - File[] files = dir.listFiles(); logger.info(methodName, null, "Reading", files.length, "service instances."); + + int c = 0; for ( File f : files ) { String s = f.toString(); if ( s.endsWith(".dws") ) { logger.info(methodName, null, "Loading file", c++, ":", f); - IDuccWorkService svc = null; - FileInputStream fis = null; - ObjectInputStream in = null; - try { - long now = System.currentTimeMillis(); - fis = new FileInputStream(f); - in = new ObjectInputStream(fis); - svc = (IDuccWorkService) in.readObject(); - logger.info(methodName, svc.getDuccId(), "Time to read service:", System.currentTimeMillis() - now); - + queue.offer(f); + counter.getAndIncrement(); - queue.offer(svc); - counter.getAndIncrement(); - } catch(Exception e) { - logger.info(methodName, null, e); - } finally { - // restoreJob(job.getDuccId().getFriendly()); - closeStream(in); - closeStream(fis); - if ( c >= max_to_load ) break; - } + if ( c >= max_to_load ) break; } else { logger.info(methodName, null, "Can't find history file", f); } @@ -254,7 +227,7 @@ public class DbLoader while ( (c = counter.get()) != 0 ) { try { - logger.info(methodName, null, "Waiting for service loads to finish, counter is", c); + logger.info(methodName, null, "Waiting for loads to finish, counter is", c, "(service instances"); Thread.sleep(1000); } catch ( Exception e ) {} @@ -270,21 +243,13 @@ public class DbLoader try { threads[i].join(); } catch ( InterruptedException e ) {} } - // try { - // List<IDuccWorkService> services = hmd.restoreServices(c); - // logger.info(methodName, null, "Recovered", services.size(), "serves."); - // } catch (Exception e) { - // // TODO Auto-generated catch block - // e.printStackTrace(); - // } - } - public void loadServiceRegistry(String registry) + public void loadServiceRegistry(String registry, boolean isHistory) { String methodName = "loadServiceRegistry"; - LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(); + LinkedBlockingQueue<Pair<String, Boolean>> queue = new LinkedBlockingQueue<Pair<String, Boolean>>(); int max_to_load = Integer.MAX_VALUE; int nth = Math.min(nthreads, max_to_load); @@ -307,7 +272,7 @@ public class DbLoader if ( s.endsWith(".svc") ) { int ndx = s.indexOf(".svc"); String numeric = s.substring(0, ndx); - queue.offer(numeric); + queue.offer(new Pair<String, Boolean>(numeric, isHistory)); counter.getAndIncrement(); if ( ++c >= max_to_load ) break; @@ -335,34 +300,78 @@ public class DbLoader } + void loadCheckpoint() + throws Exception + { + String methodName = "loadCheckpoint"; + + //Checkpointable obj = null; + FileInputStream fis = null; + ObjectInputStream in = null; + try { + fis = new FileInputStream(checkpointFile); + in = new ObjectInputStream(fis); + + Object xobj = (Object) in.readObject(); + Class<?> cl = xobj.getClass(); + Field p2jfield = cl.getDeclaredField("processToJobMap"); + p2jfield.setAccessible(true); + ConcurrentHashMap<DuccId, DuccId> p2jmap = (ConcurrentHashMap<DuccId, DuccId>) p2jfield.get(xobj); + + Field wmField = cl.getDeclaredField("workMap"); + wmField.setAccessible(true); + DuccWorkMap workMap = (DuccWorkMap) wmField.get(xobj); + + hmd.checkpoint(workMap, p2jmap); + + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + fis.close(); + in.close(); + } + + } + void run() throws Exception { String methodName = "run"; DbCreate cr = new DbCreate(state_url); - cr.createDatabase(); + cr.createPlocalDatabase(); - // load the history db + logger.info(methodName, null, "storage.useWAL", System.getProperty("storage.useWAL")); + logger.info(methodName, null, "tx.useLog", System.getProperty("tx.useLog")); if ( true ) { try { + OGlobalConfiguration.dumpConfiguration(System.out); + hmd = new HistoryManagerDb(logger); + if ( true ) loadCheckpoint(); + + OGlobalConfiguration.USE_WAL.setValue(false); + + OGlobalConfiguration.dumpConfiguration(System.out); + + // ---------- Load job history - loadJobs(); - if ( true ) return; + if ( true ) loadJobs(); // ---------- Load reservation history - loadReservations(); - + if ( true ) loadReservations(); + + // ---------- Load service isntance and AP history - loadServices(); + if ( true ) loadServices(); // ---------- Load service registry ssd = new StateServicesDb(); ssd.init(logger); - loadServiceRegistry(serviceRegistry); + loadServiceRegistry(serviceRegistry, false); try { ssd.shutdown(); } catch ( Exception e ) { @@ -372,7 +381,13 @@ public class DbLoader // ---------- Load service registry history ssd = new StateServicesDb(); ssd.init(logger); - loadServiceRegistry(serviceRegistryHistory); + loadServiceRegistry(serviceRegistryHistory, true); + + OGlobalConfiguration.USE_WAL.setValue(true); + if ( true ) loadCheckpoint(); + + + } catch ( Exception e ) { logger.error(methodName, null, e); @@ -415,7 +430,6 @@ public class DbLoader File f = null; IDuccWorkJob job = null; try { - // logger.info(methodName, null, "About to take (job)."); f = queue.take(); FileInputStream fis = null; @@ -430,7 +444,6 @@ public class DbLoader } catch(Exception e) { logger.info(methodName, null, e); } finally { - // restoreJob(job.getDuccId().getFriendly()); closeStream(in); closeStream(fis); } @@ -454,9 +467,9 @@ public class DbLoader class ServiceLoader implements Runnable { - BlockingQueue<IDuccWorkService> queue; + BlockingQueue<File> queue; List<Long> ids; - ServiceLoader(BlockingQueue<IDuccWorkService> queue, List<Long> ids) + ServiceLoader(BlockingQueue<File> queue, List<Long> ids) { this.queue = queue; this.ids = ids; @@ -467,24 +480,36 @@ public class DbLoader String methodName = "ServiceLoader.run"; while ( true ) { IDuccWorkService svc = null; + File f = null; try { - logger.info(methodName, null, "About to take (service)."); - svc = queue.take(); + f = queue.take(); + FileInputStream fis = null; + ObjectInputStream in = null; + + try { + long now = System.currentTimeMillis(); + fis = new FileInputStream(f); + in = new ObjectInputStream(fis); + svc = (IDuccWorkService) in.readObject(); + logger.info(methodName, svc.getDuccId(), "Time to read service:", System.currentTimeMillis() - now); + } catch(Exception e) { + logger.info(methodName, null, e); + } finally { + closeStream(in); + closeStream(fis); + } + hmd.saveServiceUnsafe(svc); + } catch ( InterruptedException e ) { return; - } - logger.info(methodName, svc.getDuccId(), "Took a Service"); - try { - //h = dbManager.open(); - hmd.saveServiceUnsafe(svc); - //h.close(); - synchronized(ids) { - ids.add(svc.getDuccId().getFriendly()); - } - counter.getAndDecrement(); - } catch(Exception e) { + } catch ( Exception e ){ logger.info(methodName, null, e); - } + } + + synchronized(ids) { + ids.add(svc.getDuccId().getFriendly()); + } + counter.getAndDecrement(); } } } @@ -492,9 +517,9 @@ public class DbLoader class ReservationLoader implements Runnable { - BlockingQueue<IDuccWorkReservation> queue; + BlockingQueue<File> queue; List<Long> ids; - ReservationLoader(BlockingQueue<IDuccWorkReservation> queue, List<Long> ids) + ReservationLoader(BlockingQueue<File> queue, List<Long> ids) { this.queue = queue; this.ids = ids; @@ -505,24 +530,36 @@ public class DbLoader String methodName = "ReservationLoader.run"; while ( true ) { IDuccWorkReservation res = null; + File f = null; try { - logger.info(methodName, null, "About to take (reservation)."); - res = queue.take(); + f = queue.take(); + FileInputStream fis = null; + ObjectInputStream in = null; + + try { + long now = System.currentTimeMillis(); + fis = new FileInputStream(f); + in = new ObjectInputStream(fis); + res = (IDuccWorkReservation) in.readObject(); + logger.info(methodName, res.getDuccId(), "Time to read reservation:", System.currentTimeMillis() - now); + } catch(Exception e) { + logger.info(methodName, null, e); + } finally { + closeStream(in); + closeStream(fis); + } + hmd.saveReservationUnsafe(res); + } catch ( InterruptedException e ) { return; - } - logger.info(methodName, res.getDuccId(), "Took a Service"); - try { - //h = dbManager.open(); - hmd.saveReservationUnsafe(res); - //h.close(); - synchronized(ids) { - ids.add(res.getDuccId().getFriendly()); - } - counter.getAndDecrement(); - } catch(Exception e) { + } catch ( Exception e ){ logger.info(methodName, null, e); - } + } + + synchronized(ids) { + ids.add(res.getDuccId().getFriendly()); + } + counter.getAndDecrement(); } } } @@ -531,9 +568,9 @@ public class DbLoader class ServiceRegistrationLoader implements Runnable { - BlockingQueue<String> queue; + BlockingQueue<Pair<String, Boolean>> queue; List<Long> ids; - ServiceRegistrationLoader(BlockingQueue<String> queue, List<Long> ids) + ServiceRegistrationLoader(BlockingQueue<Pair<String, Boolean>> queue, List<Long> ids) { this.queue = queue; this.ids = ids; @@ -543,10 +580,14 @@ public class DbLoader { String methodName = "ServiceRegistrationLoader.run"; while ( true ) { + Pair<String, Boolean> p = null; String id = null; + boolean isHistory; try { logger.info(methodName, null, "About to take (service id)."); - id = queue.take(); + p = queue.take(); + id = p.first(); + isHistory = p.second(); } catch ( InterruptedException e ) { return; } @@ -564,6 +605,10 @@ public class DbLoader meta_in = new FileInputStream(meta_name); svc_props.load(svc_in); meta_props.load(meta_in); + if ( isHistory ) { + meta_props.setProperty(archive_key, archive_flag); + } + String sid = meta_props.getProperty(IStateServices.SvcProps.numeric_id.pname()); if ( sid == null ) { logger.warn(methodName, null, "Cannot find service id in meta file for", id); @@ -575,8 +620,11 @@ public class DbLoader } DuccId did = new DuccId(Long.parseLong(sid)); - ssd.storePropertiesUnsafe(did, svc_props, meta_props); - + if ( isHistory ) { + ssd.storePropertiesUnsafe(did, svc_props, meta_props, DbCategory.History); + } else { + ssd.storePropertiesUnsafe(did, svc_props, meta_props, DbCategory.SmReg); + } synchronized(ids) { ids.add(did.getFriendly()); } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java?rev=1703684&r1=1703683&r2=1703684&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbTester.java Thu Sep 17 20:34:08 2015 @@ -14,6 +14,7 @@ import java.util.concurrent.atomic.Atomi import org.apache.uima.ducc.common.persistence.services.IStateServices; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.id.DuccId; +import org.apache.uima.ducc.database.DbConstants.DbCategory; import org.apache.uima.ducc.transport.event.common.IDuccWorkJob; import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation; import org.apache.uima.ducc.transport.event.common.IDuccWorkService; @@ -570,7 +571,7 @@ public class DbTester } DuccId did = new DuccId(Long.parseLong(sid)); - ssd.storePropertiesUnsafe(did, svc_props, meta_props); + ssd.storePropertiesUnsafe(did, svc_props, meta_props, DbCategory.SmReg); synchronized(ids) { ids.add(did.getFriendly()); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java?rev=1703684&r1=1703683&r2=1703684&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java Thu Sep 17 20:34:08 2015 @@ -269,7 +269,7 @@ public class HistoryManagerDb if ( safe ) { h = dbManager.open(); } else { - h = dbManager.open(); + h = dbManager.openNoTx(); } if ( safe && h.thingInDatabase(id, type, dbcat) ) { logger.warn(methodName, j.getDuccId(), "Not overwriting saved job."); @@ -289,6 +289,7 @@ public class HistoryManagerDb throw e; } finally { h.commit(); + h.close(); } } @@ -377,7 +378,7 @@ public class HistoryManagerDb try { h = dbManager.open(); Iterable<Vertex> q = h.select("SELECT * FROM " + DbVertex.Job.pname() + " WHERE ducc_dbid=" + friendly_id + - " AND " + DbConstants.DUCC_DBCAT + "='" + DbCategory.History.pname()); + " AND " + DbConstants.DUCC_DBCAT + "='" + DbCategory.History.pname() + "'"); for ( Vertex v : q ) { // There's only 1 unless db is broken. return restoreJobInternal(h, (OrientVertex) v); @@ -448,7 +449,7 @@ public class HistoryManagerDb Gson g = mkGsonForJob(); String dbres = g.toJson(r); - logger.info(methodName, null, "------------------- Reservation JSON: " + dbres); + // logger.info(methodName, null, "------------------- Reservation JSON: " + dbres); // Must repair these things because OR continues to use the job after it has been // written to history. @@ -481,7 +482,11 @@ public class HistoryManagerDb Long id = r.getDuccId().getFriendly(); DbHandle h = null; try { - h = dbManager.open(); + if ( safe ) { + h = dbManager.open(); + } else { + h = dbManager.openNoTx(); + } if ( safe && h.thingInDatabase(id, DbVertex.Reservation, dbcat) ) { h.close(); return; @@ -528,7 +533,7 @@ public class HistoryManagerDb JsonObject jo = mkJsonObject(json); Gson g = mkGsonForJob(); - logger.info(methodName, null, g.toJson(jo)); + // logger.info(methodName, null, g.toJson(jo)); r = g.fromJson(jo, DuccWorkReservation.class); @@ -536,9 +541,9 @@ public class HistoryManagerDb if ( l != null ) { for (JdReservationBean b : l ) { ConcurrentHashMap<DuccId, SizeBytes> map = b.getMap(); - for ( DuccId k : map.keySet() ) { - logger.info(methodName, null, "REST ===> " + k.getFriendly() + " " + k.getUnique() + " : " + map.get(k)); - } + //for ( DuccId k : map.keySet() ) { + // logger.info(methodName, null, "REST ===> " + k.getFriendly() + " " + k.getUnique() + " : " + map.get(k)); + //} } } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java?rev=1703684&r1=1703683&r2=1703684&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/StateServicesDb.java Thu Sep 17 20:34:08 2015 @@ -164,23 +164,27 @@ public class StateServicesDb * equivalent in the Java interface? If so, we should modify this to use it * and can then eliminate the 'safe' flag. */ - boolean storePropertiesInternal (DuccId serviceId, Properties svc_props, Properties meta_props, boolean safe) + boolean storePropertiesInternal (DuccId serviceId, Properties svc_props, Properties meta_props, boolean safe, DbCategory category) { String methodName = "storePropertiesInternal"; DbHandle h = null; try { - h = dbManager.open(); + if ( safe ) { + h = dbManager.open(); + } else { + h = dbManager.openNoTx(); + } Long id = serviceId.getFriendly(); if ( safe ) { - if ( h.thingInDatabase(id, DbVertex.ServiceReg, DbCategory.SmReg) ) { + if ( h.thingInDatabase(id, DbVertex.ServiceReg, category) ) { return false; } } - h.createPropertiesObject(svc_props, DbVertex.ServiceReg, id, DbCategory.SmReg); - h.createPropertiesObject(meta_props, DbVertex.ServiceMeta, id, DbCategory.SmReg); + h.createPropertiesObject(svc_props, DbVertex.ServiceReg, id, category); + h.createPropertiesObject(meta_props, DbVertex.ServiceMeta, id, category); h.commit(); return true; } catch ( Exception e ) { @@ -196,9 +200,9 @@ public class StateServicesDb * Save the props into the database, don't check to see if they're there already. Used by the * loader for converting old registries to the db. */ - public boolean storePropertiesUnsafe(DuccId serviceId, Properties svc_props, Properties meta_props) + public boolean storePropertiesUnsafe(DuccId serviceId, Properties svc_props, Properties meta_props, DbCategory category) { - return storePropertiesInternal(serviceId, svc_props, meta_props, false); + return storePropertiesInternal(serviceId, svc_props, meta_props, false, category); } /** @@ -209,7 +213,7 @@ public class StateServicesDb */ public boolean storeProperties(DuccId serviceId, Properties svc_props, Properties meta_props) { - return storePropertiesInternal(serviceId, svc_props, meta_props, true); + return storePropertiesInternal(serviceId, svc_props, meta_props, true, DbCategory.SmReg); } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml?rev=1703684&r1=1703683&r2=1703684&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-parent/pom.xml Thu Sep 17 20:34:08 2015 @@ -165,7 +165,7 @@ <jetty.version>7.4.4.v20110707</jetty.version> <orbit-org-apache-jasper.version>2.1.0.v201110031002</orbit-org-apache-jasper.version> <servlet-api.version>2.5</servlet-api.version> - <orientdb.version>2.1.0</orientdb.version> + <orientdb.version>2.1.2</orientdb.version> <orientdb.studio.version>2.0-M3</orientdb.studio.version> <http.commons.client.version>4.3.5</http.commons.client.version> Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java?rev=1703684&r1=1703683&r2=1703684&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-sm/src/main/java/org/apache/uima/ducc/sm/ServiceSet.java Thu Sep 17 20:34:08 2015 @@ -163,8 +163,8 @@ public class ServiceSet String[] coOwners = null; - static final String archive_key = "is_archived"; - static final String archive_flag = "true"; + String archive_key = IStateServices.archive_key; + String archive_flag = IStateServices.archive_flag; // // Constructor for a registered service