Author: challngr Date: Wed Jan 6 18:41:57 2016 New Revision: 1723398 URL: http://svn.apache.org/viewvc?rev=1723398&view=rev Log: UIMA-4577 Add tables for job process etc details.
Added: uima/sandbox/uima-ducc/trunk/src/main/admin/q_processes (with props) Modified: uima/sandbox/uima-ducc/trunk/src/main/admin/db_util.py uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccSchedulingInfo.java uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java Modified: uima/sandbox/uima-ducc/trunk/src/main/admin/db_util.py URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/db_util.py?rev=1723398&r1=1723397&r2=1723398&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/src/main/admin/db_util.py (original) +++ uima/sandbox/uima-ducc/trunk/src/main/admin/db_util.py Wed Jan 6 18:41:57 2016 @@ -11,6 +11,83 @@ def execute(CMD): print CMD return os.system(CMD) +# -------------------------------------------------------------------------------- +# these next methods are used to parse a table returned from cqlsh into a +# - header +# - dictionary of values for each row + +# parse the header into a list of names +def parse_header(header): + ret = [] + parts = header.split('|') + for p in parts: + ret.append(p.strip()) + return ret + +# parse a single line into a dictionary with key from the header and value from the line +def parse_line(header, line): + parts = line.split('|') + ret = {} + for k, v in zip(header, parts): + ret[k] = v.strip() + return ret + +# parse a set of lines returned from cqlsh into a header and a list of dictionaries, one per line +# header_id is a sting we use to positively identify a header line +def parse(lines, header_id): + ret = [] + header = [] + for l in lines: + l = l.strip() + # print '[]', l + if ( l == '' ): + continue + if ( '---' in l ): + continue; + if ( 'rows)' in l ): + continue; + if ( header_id in l ): + header = parse_header(l) + continue + + ret.append(parse_line(header, l)) + + return header, ret + +# given a header and a collection of lines parsed by the utilities above, print a +# mostly un-ugly listing of the table retults +def format(header, lines): + + # calculate max column widths + hlens = {} + for k in header: + hlens[k] = len(k) + for line in lines: + if ( not hlens.has_key(k) ): + hlens[k] = len(line[k]) + else: + hlens[k] = max(len(line[k]), hlens[k]) + + # create a format string from the widths + fmt = '' + for k in header: + fmt = fmt + ' %' + str(hlens[k]) + 's' + + # first the header + print fmt % tuple(header) + + # now the rows + for line in lines: + l = [] + for k in header: + l.append(line[k]) + print fmt % tuple(l) + return + + +# end of row parsing utilities +# -------------------------------------------------------------------------------- + def stop_database(pidfile): print "Stopping the dtabase." Added: uima/sandbox/uima-ducc/trunk/src/main/admin/q_processes URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/q_processes?rev=1723398&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/src/main/admin/q_processes (added) +++ uima/sandbox/uima-ducc/trunk/src/main/admin/q_processes Wed Jan 6 18:41:57 2016 @@ -0,0 +1,171 @@ +#!/usr/bin/env python +# ----------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# ----------------------------------------------------------------------- + + +import os +import sys +import subprocess +import datetime +import getopt +import time + +import db_util + +from ducc_util import DuccUtil + +class DuccRmQProcesses(DuccUtil): + + def usage(self, *msg): + if ( msg[0] != None ): + print ' '.join(msg) + print 'Usage:' + print ' q_processes options' + print '' + print 'Where options include:' + print ' -j --job jobid sho all processes for this job' + print ' -n --node show all processes on a node' + print ' -f --from date include only process since this date' + print ' -t --to date include processes only up to this date' + print '' + print 'Notes:' + print ' Omit -f and -t to get all processes.' + print '' + print ' If -f OR -t is specified, you MUST specifiy a node (-n) as well.' + print '' + print '' + print ' Date formats:' + print ' mm/dd/yy Use this to specify everything on a given day' + print ' mm/dd/yy.hh:mm Use this to specify a specific hour and minute.' + print ' Hours use the military (24-hour) clock.' + print '' + print 'Examples:' + print '' + print ' Show all work on bluejbob on Feb 15 2015 between noon and 4PM' + print ' q_processes --node bluejbob --from 2/14/15.12:00 --to 2/14/15.16:00' + print '' + print ' Show all history for job 1234' + print ' q_processes --job 1234' + print '' + print ' Show history for job 1234 on node bluebob' + print ' q_processes --job 1234 --node bluebob' + + sys.exit(0) + + + def parse_date(self, dat): + if ( '.' in dat ): + fmt = '%m/%d/%y.%H:%M' + else: + fmt = '%m/%d/%y' + + d = datetime.datetime.strptime(dat, fmt) + return int(time.mktime(d.timetuple()))*1000 + + def get_date(self, dat): + return datetime.datetime.fromtimestamp(dat) + + def main(self, argv): + + node = None + fromt = None + tot = None + jobid = None + conjunction = 'WHERE' + + print 'argv', argv + try: + opts, args = getopt.getopt(argv, 'f:j:n:t:h?', ['from=', 'to=', 'job=', 'node=', 'help', ]) + except: + self.usage("Invalid arguments " + ' '.join(argv)) + + + for ( o, a ) in opts: + if o in ('-n', '--node'): + node = a + elif o in ('-f', '--from'): + fromt = self.parse_date(a) + elif o in ('-t', '--to'): + tot = self.parse_date(a) + elif o in ('-j', '--job'): + jobid = a + elif o in ('-h', '-?', '--help'): + self.usage(None) + + + if ( ( fromt != None ) and ( tot != None ) and ( node == None ) ): + self.usage("Node must be specified when a date range is specified.") + + query = ['select * from ducc.processes'] + if ( node != None ): + query.append(conjunction) + conjunction = 'AND' + query.append("node='" + node + "'") + + + if ( fromt != None ): + query.append(conjunction) + conjunction = 'AND' + query.append("start > " + str(fromt)) + + if ( jobid != None ): + query.append(conjunction) + conjunction = 'AND' + query.append("job_id = " + jobid) + + if ( tot != None ): + if ( fromt == None ): + usage("--from must be specified if --to is also specified.") + query.append(conjunction) + conjunction = 'AND' + query.append("stop < " + str(tot)) + + if ( fromt != None ): + query.append("ALLOW FILTERING") + + query = '"' + ' '.join(query) + '"' + DH = self.DUCC_HOME + dbn = self.ducc_properties.get('ducc.database.host') + + CMD = [DH + '/cassandra-server/bin/cqlsh', dbn, '-u', 'guest', '-p', 'guest', '-e', query] + CMD = ' '.join(CMD) + print CMD + + lines = [] + proc = subprocess.Popen(CMD, bufsize=0, stdout=subprocess.PIPE, shell=True) + for line in proc.stdout: + # print '[]', line.strip() + lines.append(line) + + + header, lines = db_util.parse(lines, 'job_id') + for line in lines: + line['start'] = str(self.get_date(int(line['start'])/1000)) + line['stop'] = str(self.get_date(int(line['stop'])/1000)) + + db_util.format(header, lines) + + return + + +if __name__ == "__main__": + stopper = DuccRmQProcesses() + stopper.main(sys.argv[1:]) + + Propchange: uima/sandbox/uima-ducc/trunk/src/main/admin/q_processes ------------------------------------------------------------------------------ svn:executable = * Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java?rev=1723398&r1=1723397&r2=1723398&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java Wed Jan 6 18:41:57 2016 @@ -131,7 +131,7 @@ public class DbCreate break; // no crash, we're outta here } catch ( Exception ee ) { doLog(methodName, "Authorization fails with both the default userid/password and the new userid/password."); - doLog(methodName, "Retrhying, as first-time database may take a few moments to initialize."); + doLog(methodName, "Retrying, as first-time database may take a few moments to initialize."); } session = null; cluster = null; @@ -216,33 +216,12 @@ public class DbCreate session.execute(s); } - // - // String[] rmSchema = RmStatePersistence.mkSchemaItems(); - // String cql = DbUtil.mkTableCreate("ducc.rmnodes", rmSchema); - // doLog(methodName, "CQL:", cql); - // session.execute(cql); - List<SimpleStatement>smSchema = StateServicesDb.mkSchema(); for ( SimpleStatement s : smSchema ) { doLog(methodName, "EXECUTE STATEMENT:", s.toString()); session.execute(s); } - -// String[] smSchemaReg = StateServicesDb.mkSchemaForReg(); -// cql = DbUtil.mkTableCreate("ducc.smreg", smSchemaReg); -// doLog(methodName, "CQL:", cql); -// session.execute(cql); -// cql = "CREATE INDEX IF NOT EXISTS ON ducc.smreg(active)"; -// session.execute(cql); -// -// String[] smSchemaMeta = StateServicesDb.mkSchemaForMeta(); -// cql = DbUtil.mkTableCreate("ducc.smmeta", smSchemaMeta); -// doLog(methodName, "CQL:", cql); -// session.execute(cql); -// cql = "CREATE INDEX IF NOT EXISTS ON ducc.smmeta(active)"; -// session.execute(cql); - List<SimpleStatement>orSchema = HistoryManagerDb.mkSchema(); for ( SimpleStatement s : orSchema ) { doLog(methodName, "EXECUTE STATEMENT:", s.toString()); @@ -252,7 +231,7 @@ public class DbCreate } catch ( Exception e ) { doLog(methodName, "Cannot create schema:", e); } - + } /** Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java?rev=1723398&r1=1723397&r2=1723398&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbHandle.java Wed Jan 6 18:41:57 2016 @@ -50,7 +50,7 @@ public class DbHandle String methodName = "execute"; long now = System.currentTimeMillis(); ResultSet ret = manager.execute(sql); - if ( manager.noisy ) logger.info(methodName, null, "Time to execute", System.currentTimeMillis() - now); + if ( manager.noisy ) logger.debug(methodName, null, "Time to execute", System.currentTimeMillis() - now); return ret; } @@ -215,7 +215,7 @@ public class DbHandle manager.execute(cql); return true; } finally { - logger.info(methodName, null, "Time to update one property", propkey, System.currentTimeMillis() - now); + logger.debug(methodName, null, "Time to update one property", propkey, System.currentTimeMillis() - now); } } @@ -243,16 +243,17 @@ public class DbHandle String cql = DbUtil.mkUpdate(table, row, props); try { + logger.trace(methodName, null, cql); manager.execute(cql); } finally { - logger.info(methodName, null, "Total time to update properties", System.currentTimeMillis() - now); + logger.debug(methodName, null, "Total time to update properties", System.currentTimeMillis() - now); } } PreparedStatement prepare(String cql) { - String methodName = "prepare"; + //String methodName = "prepare"; return manager.prepare(cql); } @@ -272,7 +273,7 @@ public class DbHandle BoundStatement boundStatement = new BoundStatement(ps); BoundStatement bound = boundStatement.bind(fields); execute(bound); - logger.info(methodName, null, "Time to execute prepared statement:", ps.getQueryString(), System.currentTimeMillis() - now); + logger.debug(methodName, null, "Time to execute prepared statement:", ps.getQueryString(), System.currentTimeMillis() - now); } /** Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java?rev=1723398&r1=1723397&r2=1723398&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbLoader.java Wed Jan 6 18:41:57 2016 @@ -18,6 +18,7 @@ */ package org.apache.uima.ducc.database; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; @@ -39,6 +40,8 @@ import org.apache.uima.ducc.common.persi import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.id.DuccId; import org.apache.uima.ducc.transport.event.common.DuccWorkMap; +import org.apache.uima.ducc.transport.event.common.IDuccWork; +import org.apache.uima.ducc.transport.event.common.IDuccWorkService; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; @@ -83,13 +86,13 @@ public class DbLoader String checkpointFile = "/state/orchestrator.ckpt"; - int nthreads = 20; + int nthreads = 10; AtomicInteger counter = new AtomicInteger(0); - //int joblimit = 10; - //int reservationlimit = 10; - //int servicelimit = 10; - //int registrylimit = 1; + //int joblimit = 100; + //int reservationlimit = 100; + //int servicelimit = 100; + //int registrylimit = 100; int joblimit = Integer.MAX_VALUE; int reservationlimit = Integer.MAX_VALUE; @@ -149,6 +152,7 @@ public class DbLoader String methodName = "loadJobs"; logger.info(methodName, null, " -------------------- Load jobs ----------------"); + System.out.println(" -------------------- Load jobs ----------------"); File dir = new File(jobHistory); if ( !dir.isDirectory() ) { logger.info(methodName, null, "Cannot find job history; skipping load of jobs."); @@ -197,6 +201,7 @@ public class DbLoader while ( (c = counter.get()) != 0 ) { try { logger.info(methodName, null, "Waiting for loads to finish, counter is", c, "(job)."); + System.out.println("Waiting for job loads to finish, counter is " + c); Thread.sleep(1000); } catch ( Exception e ) {} @@ -224,6 +229,8 @@ public class DbLoader String methodName = "loadReservations"; logger.info(methodName, null, " -------------------- Load reservations ----------------"); + System.out.println(" -------------------- Load reservations ----------------"); + File dir = new File(reservationHistory); if ( ! dir.isDirectory() ) { logger.info(methodName, null, "No reservation directory found; skipping database load of reservations."); @@ -272,6 +279,8 @@ public class DbLoader while ( (c = counter.get()) != 0 ) { try { logger.info(methodName, null, "Waiting for reservation loads to finish, counter is", c); + System.out.println("Waiting for reservation loads to finish, counter is " + c); + Thread.sleep(1000); } catch ( Exception e ) {} @@ -300,6 +309,8 @@ public class DbLoader String methodName = "loadServices"; logger.info(methodName, null, " -------------------- Load services ----------------"); + System.out.println(" -------------------- Load AP/Service Instances ----------------"); + File dir = new File(serviceHistory); if ( ! dir.isDirectory() ) { logger.info(methodName, null, "No service history directory found; skipping load of service history."); @@ -348,6 +359,7 @@ public class DbLoader while ( (c = counter.get()) != 0 ) { try { logger.info(methodName, null, "Waiting for loads to finish, counter is", c, "(service instances"); + System.out.println("Waiting for AP/Service Instance loads to finish, counter is " + c); Thread.sleep(1000); } catch ( Exception e ) {} @@ -374,6 +386,8 @@ public class DbLoader String methodName = "loadServiceRegistry"; logger.info(methodName, null, " -------------------- Load registry; isHistory", isHistory, " ----------------"); + System.out.println(" -------------------- Load Service Registry " + (isHistory ? "(history)" : "(active registrations)") + " ----------------"); + int c = 0; File dir = new File(registry); @@ -426,6 +440,8 @@ public class DbLoader while ( (c = counter.get()) != 0 ) { try { logger.info(methodName, null, "Waiting for service registry loads to finish, counter is", c); + System.out.println("Waiting for service registration loads to finish, counter is " + c); + Thread.sleep(1000); } catch ( Exception e ) {} @@ -502,7 +518,7 @@ public class DbLoader { String methodName = "foo"; DbHandle h = dbManager.open(); - SimpleStatement s = new SimpleStatement("SELECT * from " + HistoryManagerDb.JOB_TABLE + " limit 5000"); + SimpleStatement s = new SimpleStatement("SELECT * from " + HistoryManagerDb.JOB_HISTORY_TABLE + " limit 5000"); //SimpleStatement s = new SimpleStatement("SELECT * from " + HistoryManagerDb.RES_TABLE + " limit 5000"); //SimpleStatement s = new SimpleStatement("SELECT * from " + HistoryManagerDbB.SVC_TABLE + " limit 5000"); logger.info(methodName, null, "Fetch size", s.getFetchSize()); @@ -559,7 +575,16 @@ public class DbLoader hmd = new HistoryManagerDb(); hmd.init(logger, dbManager); - + + // drop some of the history indices to speed up + System.out.println("Temporarily dropping some indexes"); + List<SimpleStatement> drops = HistoryManagerDb.dropIndices(); + DbHandle h = dbManager.open(); + for ( SimpleStatement ss : drops ) { + System.out.println(ss.getQueryString()); + h.execute(ss); + } + long nowt = System.currentTimeMillis(); if ( docheckpoint ) loadCheckpoint(); logger.info(methodName, null, "***** Time to load checkpoint A ****", System.currentTimeMillis() - nowt); @@ -608,6 +633,15 @@ public class DbLoader if ( docheckpoint ) loadCheckpoint(); logger.info(methodName, null, "**** Time to reload checkpoint B ****", System.currentTimeMillis() - nowt); + // recreate dropped indices + System.out.println("Restoring indexes"); + List<SimpleStatement> indices = HistoryManagerDb.createIndices(); + h = dbManager.open(); + for ( SimpleStatement ss : indices ) { + System.out.println(ss.getQueryString()); + h.execute(ss); + } + } catch ( Exception e ) { logger.error(methodName, null, e); @@ -628,8 +662,11 @@ public class DbLoader System.out.println("Where:"); System.out.println(" from"); System.out.println(" is the DUCC_HOME you wish to convert."); + System.out.println(" "); System.out.println(" to"); System.out.println(" is the datbase URL."); + System.out.println(" "); + System.exit(1); } @@ -643,10 +680,10 @@ public class DbLoader } } + static PreparedStatement jobPrepare = null; class JobLoader implements Runnable { - PreparedStatement statement = null; BlockingQueue<File> queue; List<Long> ids; JobLoader(BlockingQueue<File> queue, List<Long> ids) @@ -656,7 +693,11 @@ public class DbLoader this.ids = ids; DbHandle h = dbManager.open(); - statement = h.prepare("INSERT INTO " + HistoryManagerDb.JOB_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?);"); + synchronized(JobLoader.class) { + if ( jobPrepare == null ) { + jobPrepare = h.prepare("INSERT INTO " + HistoryManagerDb.JOB_HISTORY_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?)"); + } + } } public void run() @@ -670,7 +711,7 @@ public class DbLoader try { f = queue.take(); FileInputStream fis = null; - ObjectInputStream in = null; + // ObjectInputStream in = null; try { long now = System.currentTimeMillis(); @@ -692,18 +733,33 @@ public class DbLoader ByteBuffer bb = ByteBuffer.wrap(buf); logger.info(methodName, did, "Time to read job:", System.currentTimeMillis() - now+" MS", "bytes:", nbytes); - logger.info(methodName, did, "Job", duccid, "Store CQL:", statement.getQueryString()); + logger.info(methodName, did, "Job", duccid, "Store CQL:", jobPrepare.getQueryString()); + long now1 = System.currentTimeMillis(); - BoundStatement boundStatement = new BoundStatement(statement); + BoundStatement boundStatement = new BoundStatement(jobPrepare); BoundStatement bound = boundStatement.bind(duccid, "job", true, bb); DbHandle h = dbManager.open(); h.execute(bound); logger.info(methodName, did, "Time to store job", duccid, "- Database update:", (System.currentTimeMillis() - now1) + " MS", "Total save time:", (System.currentTimeMillis() - now) + " MS"); + + synchronized(ids) { + // any sync object is ok - but we want to effectively single thread the writing of the details as this tends + // to overrun the DB during this bulk load. + ByteArrayInputStream bais = new ByteArrayInputStream(buf); + ObjectInputStream ois = new ObjectInputStream(bais); + Object o = ois.readObject(); + ois.close(); + bais.close(); + + now = System.currentTimeMillis(); + hmd.summarizeProcesses(h, (IDuccWork) o, "J"); + hmd.summarizeJob(h, (IDuccWork) o, "J"); + logger.info(methodName, did, "Time to store process summaries for job", duccid, ":", (System.currentTimeMillis() - now)); + } } catch(Exception e) { logger.info(methodName, did, e); - } finally { - closeStream(in); + } finally { closeStream(fis); counter.getAndDecrement(); } @@ -725,11 +781,11 @@ public class DbLoader } } - + PreparedStatement servicePrepare = null; class ServiceLoader implements Runnable { - PreparedStatement statement = null; + BlockingQueue<File> queue; List<Long> ids; ServiceLoader(BlockingQueue<File> queue, List<Long> ids) @@ -738,7 +794,11 @@ public class DbLoader this.queue = queue; this.ids = ids; DbHandle h = dbManager.open(); - statement = h.prepare("INSERT INTO " + HistoryManagerDb.SVC_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?);"); + synchronized(ServiceLoader.class) { + if ( servicePrepare == null ) { + servicePrepare = h.prepare("INSERT INTO " + HistoryManagerDb.SVC_HISTORY_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?);"); + } + } } public void run() @@ -774,14 +834,42 @@ public class DbLoader ByteBuffer bb = ByteBuffer.wrap(buf); logger.info(methodName, did, "Time to read service", duccid, ":", System.currentTimeMillis() - now + " MS", "bytes:", nbytes); - logger.info(methodName, did, "Service", duccid, "Store CQL:", statement.getQueryString()); + logger.info(methodName, did, "Service", duccid, "Store CQL:", servicePrepare.getQueryString()); long now1 = System.currentTimeMillis(); - BoundStatement boundStatement = new BoundStatement(statement); + BoundStatement boundStatement = new BoundStatement(servicePrepare); BoundStatement bound = boundStatement.bind(duccid, "service", true, bb); DbHandle h = dbManager.open(); h.execute(bound); logger.info(methodName, did, "Time to store service", duccid, "- Database update:", (System.currentTimeMillis() - now1) + " MS", "Total save time:", (System.currentTimeMillis() - now) + " MS"); + + ByteArrayInputStream bais = new ByteArrayInputStream(buf); + ObjectInputStream ois = new ObjectInputStream(bais); + Object o = ois.readObject(); + ois.close(); + bais.close(); + + String type = null; + if ( ((IDuccWorkService)o).getServiceDeploymentType() == null ) { + logger.warn(methodName, did, "getServiceDeploymentType is null, not extracting details."); + continue; + } + + switch ( ((IDuccWorkService)o).getServiceDeploymentType() ) + { + case uima: + case custom: + type = "S"; + break; + case other: + type = "A"; + break; + } + + now = System.currentTimeMillis(); + hmd.summarizeProcesses(h, (IDuccWork) o, type); + logger.info(methodName, did, "Time to store AP/Service Instance summaries for job", duccid, ":", (System.currentTimeMillis() - now)); + } catch(Exception e) { logger.info(methodName, did, e); } finally { @@ -806,10 +894,10 @@ public class DbLoader } } + static PreparedStatement reservationPrepare = null; class ReservationLoader implements Runnable { - PreparedStatement statement = null; BlockingQueue<File> queue; List<Long> ids; ReservationLoader(BlockingQueue<File> queue, List<Long> ids) @@ -818,7 +906,11 @@ public class DbLoader this.queue = queue; this.ids = ids; DbHandle h = dbManager.open(); - statement = h.prepare("INSERT INTO " + HistoryManagerDb.RES_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?);"); + synchronized(ReservationLoader.class) { + if ( reservationPrepare == null ) { + reservationPrepare = h.prepare("INSERT INTO " + HistoryManagerDb.RES_HISTORY_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?);"); + } + } } public void run() @@ -854,16 +946,28 @@ public class DbLoader ByteBuffer bb = ByteBuffer.wrap(buf); logger.info(methodName, did, "Time to read reservation", duccid, ":", System.currentTimeMillis() - now+" MS", "bytes:", nbytes); - logger.info(methodName, did, "Reservation", duccid, "Store CQL:", statement.getQueryString()); + logger.info(methodName, did, "Reservation", duccid, "Store CQL:", reservationPrepare.getQueryString()); long now1 = System.currentTimeMillis(); - BoundStatement boundStatement = new BoundStatement(statement); + BoundStatement boundStatement = new BoundStatement(reservationPrepare); BoundStatement bound = boundStatement.bind(duccid, "reservation", true, bb); DbHandle h = dbManager.open(); h.execute(bound); logger.info(methodName, did, "Time to store reservation", duccid, "- Database update:", (System.currentTimeMillis() - now1) + " MS", "Total save time:", (System.currentTimeMillis() - now) + " MS"); + ByteArrayInputStream bais = new ByteArrayInputStream(buf); + ObjectInputStream ois = new ObjectInputStream(bais); + Object o = ois.readObject(); + ois.close(); + bais.close(); + + now = System.currentTimeMillis(); + hmd.summarizeProcesses(h, (IDuccWork) o, "R"); // details on the "process" in the map + hmd.summarizeReservation(h, (IDuccWork) o); // details on the reservaiton itself + logger.info(methodName, did, "Time to store reservation summaries for job", duccid, ":", (System.currentTimeMillis() - now)); + } catch(Exception e) { + e.printStackTrace(); logger.info(methodName, did, e); } finally { closeStream(in); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java?rev=1723398&r1=1723397&r2=1723398&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java Wed Jan 6 18:41:57 2016 @@ -48,7 +48,7 @@ public class DbManager private static String db_id = null; private static String db_pw = null; - boolean noisy = true; + boolean noisy = false; String dburl; DuccLogger logger; @@ -62,7 +62,8 @@ public class DbManager this.dburl = dburl; this.logger = logger; - if ( System.getProperty(NOISE_PROPERTY) != null ) noisy = false; + if ( System.getProperty(NOISE_PROPERTY) != null ) noisy = true; + System.out.println("NOISY " + noisy); } boolean checkForDatabase() Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java?rev=1723398&r1=1723397&r2=1723398&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java Wed Jan 6 18:41:57 2016 @@ -71,6 +71,22 @@ class DbUtil return buf.toString(); } + static List<String> dropIndices(IDbProperty[] props, String tablename) + { + List<String> ret = new ArrayList<String>(); + for ( IDbProperty p : props ) { + if ( p.isIndex() ) { + StringBuffer buf = new StringBuffer("DROP INDEX IF EXISTS "); + buf.append(tablename); + buf.append("_"); + buf.append(p.pname()); + buf.append("_idx;"); + ret.add(buf.toString()); + } + } + return ret; + } + static List<String> mkIndices(IDbProperty[] props, String tablename) { List<String> ret = new ArrayList<String>(); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java?rev=1723398&r1=1723397&r2=1723398&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/HistoryManagerDb.java Wed Jan 6 18:41:57 2016 @@ -30,15 +30,24 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.uima.ducc.common.Pair; +import org.apache.uima.ducc.common.node.metrics.ProcessGarbageCollectionStats; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.id.DuccId; +import org.apache.uima.ducc.transport.event.common.ADuccWorkExecutable; import org.apache.uima.ducc.transport.event.common.DuccWorkMap; import org.apache.uima.ducc.transport.event.common.DuccWorkReservation; +import org.apache.uima.ducc.transport.event.common.IDuccProcess; +import org.apache.uima.ducc.transport.event.common.IDuccProcessMap; +import org.apache.uima.ducc.transport.event.common.IDuccReservation; +import org.apache.uima.ducc.transport.event.common.IDuccReservationMap; +import org.apache.uima.ducc.transport.event.common.IDuccSchedulingInfo; +import org.apache.uima.ducc.transport.event.common.IDuccStandardInfo; import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType; import org.apache.uima.ducc.transport.event.common.IDuccWork; import org.apache.uima.ducc.transport.event.common.IDuccWorkJob; import org.apache.uima.ducc.transport.event.common.IDuccWorkReservation; import org.apache.uima.ducc.transport.event.common.IDuccWorkService; +import org.apache.uima.ducc.transport.event.common.ITimeWindow; import org.apache.uima.ducc.transport.event.common.history.IHistoryPersistenceManager; import com.datastax.driver.core.PreparedStatement; @@ -46,8 +55,6 @@ import com.datastax.driver.core.ResultSe import com.datastax.driver.core.Row; import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.exceptions.NoHostAvailableException; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; public class HistoryManagerDb @@ -58,15 +65,34 @@ public class HistoryManagerDb private DuccLogger logger = null; private DbManager dbManager; - PreparedStatement jobPrepare = null; - PreparedStatement reservationPrepare = null; - PreparedStatement servicePrepare = null; + PreparedStatement jobBlobPrepare = null; + PreparedStatement reservationBlobPrepare = null; + PreparedStatement serviceBlobPrepare = null; PreparedStatement ckptPrepare = null; - static final String JOB_TABLE = OrWorkProps.JOB_TABLE.pname(); - static final String RES_TABLE = OrWorkProps.RESERVATION_TABLE.pname(); - static final String SVC_TABLE = OrWorkProps.SERVICE_TABLE.pname(); + + PreparedStatement processDetailsPrepare = null; // "process" for things that aren't "reservations" + PreparedStatement reservationAllocPrepare = null; // "process" for things that are "reservaitons" + + PreparedStatement jobDetailsPrepare = null; + PreparedStatement reservationDetailsPrepare = null; + + static final String JOB_HISTORY_TABLE = OrWorkProps.JOB_HISTORY_TABLE.pname(); + static final String RES_HISTORY_TABLE = OrWorkProps.RESERVATION_HISTORY_TABLE.pname(); + static final String SVC_HISTORY_TABLE = OrWorkProps.SERVICE_HISTORY_TABLE.pname(); static final String CKPT_TABLE = OrCkptProps.CKPT_TABLE.pname(); + static final String PROCESS_TABLE = OrProcessProps.TABLE_NAME.pname(); + static final String JOB_TABLE = OrJobProps.TABLE_NAME.pname(); + static final String RESERVATION_TABLE = OrReservationProps.TABLE_NAME.pname(); + static String[] alltables = {JOB_HISTORY_TABLE, + RES_HISTORY_TABLE, + SVC_HISTORY_TABLE, + CKPT_TABLE, + PROCESS_TABLE, + JOB_TABLE, + RESERVATION_TABLE} + ; + public HistoryManagerDb() { } @@ -89,10 +115,14 @@ public class HistoryManagerDb // prepare some statements DbHandle h = dbManager.open(); - jobPrepare = h.prepare("INSERT INTO " + JOB_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?) IF NOT EXISTS;"); - reservationPrepare = h.prepare("INSERT INTO " + RES_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?) IF NOT EXISTS;"); - servicePrepare = h.prepare("INSERT INTO " + SVC_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?) IF NOT EXISTS;"); - ckptPrepare = h.prepare("INSERT INTO " + CKPT_TABLE + " (id, work, p2jmap) VALUES (?, ?, ?);"); + jobBlobPrepare = h.prepare("INSERT INTO " + JOB_HISTORY_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?) ;"); + reservationBlobPrepare = h.prepare("INSERT INTO " + RES_HISTORY_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?) ;"); + serviceBlobPrepare = h.prepare("INSERT INTO " + SVC_HISTORY_TABLE + " (ducc_dbid, type, history, work) VALUES (?, ?, ?, ?) ;"); + ckptPrepare = h.prepare("INSERT INTO " + CKPT_TABLE + " (id, work, p2jmap) VALUES (?, ?, ?);"); + processDetailsPrepare = h.prepare("INSERT INTO " + PROCESS_TABLE + " (host, job_id, ducc_pid, type, user, memory, start, stop, class, pid, reason_agent, exit_code, reason_scheduler, cpu, swap_max, run_time, init_time, initialized, investment, major_faults, gc_count, gc_time) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ;"); + reservationAllocPrepare = h.prepare("INSERT INTO " + PROCESS_TABLE + " (host, job_id, ducc_pid, type, user, memory, start, stop, class, run_time) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ;"); + jobDetailsPrepare = h.prepare("INSERT INTO " + JOB_TABLE + " (user, class, ducc_dbid, submission_time, duration, memory, reason, init_fails, errors, pgin, swap, total_wi, retries, preemptions, description) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ;"); + reservationDetailsPrepare = h.prepare("INSERT INTO " + RESERVATION_TABLE + " (user, class, ducc_dbid, submission_time, duration, memory, reason, processes, state, type, hosts, description) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"); break; } catch ( NoHostAvailableException e ) { logger.error(methodName, null, "Cannot contact database. Retrying in 5 seconds."); @@ -123,6 +153,61 @@ public class HistoryManagerDb return init(stateUrl, dbManager); } + /** + * For bulk loader, we drop some of the indices during loading. + * + * Some of the tables are not a problem during bulk loading so we only externalize the + * indexes on some tables. + */ + static ArrayList<SimpleStatement> dropIndices() + { + ArrayList<SimpleStatement> ret = new ArrayList<SimpleStatement>(); + + List<String> indexes = DbUtil.dropIndices(OrProcessProps.values(), PROCESS_TABLE); + for ( String s : indexes ) { + ret.add(new SimpleStatement(s)); + } + + indexes = DbUtil.dropIndices(OrJobProps.values(), JOB_TABLE); + for ( String s : indexes ) { + ret.add(new SimpleStatement(s)); + } + + indexes = DbUtil.dropIndices(OrReservationProps.values(), RESERVATION_TABLE); + for ( String s : indexes ) { + ret.add(new SimpleStatement(s)); + } + + return ret; + } + + /** + * For bulk loader, we must recreate indices + * + * Some of the tables are not a problem during bulk loading so we only externalize the + * indexes on some tables. + */ + static ArrayList<SimpleStatement> createIndices() + { + ArrayList<SimpleStatement> ret = new ArrayList<SimpleStatement>(); + + List<String> indexes = DbUtil.mkIndices(OrProcessProps.values(), PROCESS_TABLE); + for ( String s : indexes ) { + ret.add(new SimpleStatement(s)); + } + + indexes = DbUtil.mkIndices(OrJobProps.values(), JOB_TABLE); + for ( String s : indexes ) { + ret.add(new SimpleStatement(s)); + } + + indexes = DbUtil.mkIndices(OrReservationProps.values(), RESERVATION_TABLE); + for ( String s : indexes ) { + ret.add(new SimpleStatement(s)); + } + + return ret; + } /** * Schema gen. Do anything you want to make the schema, but notice that DbUtil has a few convenience methods if @@ -137,16 +222,13 @@ public class HistoryManagerDb buf.append(DbUtil.mkSchema(OrWorkProps.values())); buf.append(")"); buf.append("WITH CLUSTERING ORDER BY (ducc_dbid desc)"); - ret.add(new SimpleStatement(buf.toString())); + List<String> indexes = DbUtil.mkIndices(OrWorkProps.values(), tablename); for ( String s : indexes ) { ret.add(new SimpleStatement(s)); } - // ret.add(new SimpleStatement("CREATE INDEX IF NOT EXISTS ON " + tablename + "(ducc_dbid)")); - // ret.add(new SimpleStatement("CREATE INDEX IF NOT EXISTS ON " + tablename + "(history)")); - return ret; } @@ -155,20 +237,234 @@ public class HistoryManagerDb { ArrayList<SimpleStatement> ret = new ArrayList<SimpleStatement>(); - ret.addAll(mkSchema(JOB_TABLE)); - ret.addAll(mkSchema(RES_TABLE)); - ret.addAll(mkSchema(SVC_TABLE)); + ret.addAll(mkSchema(JOB_HISTORY_TABLE)); + ret.addAll(mkSchema(RES_HISTORY_TABLE)); + ret.addAll(mkSchema(SVC_HISTORY_TABLE)); StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + CKPT_TABLE + " ("); buf.append(DbUtil.mkSchema(OrCkptProps.values())); buf.append(")"); ret.add(new SimpleStatement(buf.toString())); + buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + PROCESS_TABLE + " ("); + buf.append(DbUtil.mkSchema(OrProcessProps.values())); + buf.append(")"); + ret.add(new SimpleStatement(buf.toString())); + + buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + JOB_TABLE + " ("); + buf.append(DbUtil.mkSchema(OrJobProps.values())); + buf.append(")"); + ret.add(new SimpleStatement(buf.toString())); + + + buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + RESERVATION_TABLE + " ("); + buf.append(DbUtil.mkSchema(OrReservationProps.values())); + buf.append(")"); + ret.add(new SimpleStatement(buf.toString())); + + // PLEASE NOTE: The process, job, and reservaiton tables can have 10000s, 100000s or 1000000s of records during bulk + // load of a system that has been running a while during execution of DbLoader. The DbLoader will drop the indexes + // on these three tables and then recreate them. To support this we break out the creation into another + // routine that can be called from the loader. + ret.addAll(createIndices()); + return ret; } // ---------------------------------------------------------------------------------------------------- - // Jobs section + + int toInt(String i) + { + if ( i == null ) return 0; + try { + return Integer.parseInt(i); + } catch ( Exception e ) { + return 0; + } + } + + String getString(String s) + { + return s == null ? "<none>" : s; + } + + void summarizeJob(DbHandle h, IDuccWork w, String type) + throws Exception + { + IDuccWorkJob j = (IDuccWorkJob) w; + // need duccid, user, class, submission-time, duration, memory, exit-reason, init-fails, pgin, swap, total-wi, retries, preemptions, description + + long ducc_dbid = j.getDuccId().getFriendly(); + + IDuccStandardInfo dsi = j.getStandardInfo(); + IDuccSchedulingInfo dsx = j.getSchedulingInfo(); + + String user = dsi.getUser(); + String jclass = dsx.getSchedulingClass(); + + int memory = toInt(dsx.getMemorySizeRequested()); + long submission = dsi.getDateOfSubmissionMillis(); + long completion = dsi.getDateOfCompletionMillis(); + long duration = Math.max(0, completion - submission); + String reason = getString(j.getCompletionType().toString()); + int init_fails = (int) j.getProcessInitFailureCount(); + long pgin = j.getPgInCount(); + long swap = (long) j.getSwapUsageGbMax(); + int wi = (int) j.getWiTotal(); + int errors = toInt(dsx.getWorkItemsError()); + int retries = toInt(dsx.getWorkItemsRetry()); + int preemptions = toInt(dsx.getWorkItemsPreempt()); + String description = getString(dsi.getDescription()); + h.execute(jobDetailsPrepare, user, jclass, ducc_dbid, submission, duration, memory, reason, init_fails, errors, pgin, swap, wi, retries, preemptions, description); + } + + void summarizeProcesses(DbHandle h, IDuccWork w, String type) + throws Exception + { + // Loop through the processes on w saving useful things: + // jobid, processid, node, user, type of job, PID, duration, start timestamp, stop timestamp, exit state, + // memory, CPU, exit code, swap max, investment, init time + long job_id = w.getDuccId().getFriendly(); + int stupid = 0; + stupid++; + switch ( w.getDuccType() ) { + case Job: + case Service: + case Pop: + { + if ( job_id == 287249 ) { + stupid++; + stupid++; + } + + + IDuccProcessMap m = ((ADuccWorkExecutable)w).getProcessMap(); + Map<DuccId, IDuccProcess> map = m.getMap(); + IDuccStandardInfo dsi = w.getStandardInfo(); + IDuccSchedulingInfo dsx = w.getSchedulingInfo(); + + String user = dsi.getUser(); + int memory = toInt(dsx.getMemorySizeRequested()); + String sclass = dsx.getSchedulingClass(); + + for ( IDuccProcess idp : map.values() ) { + stupid++; + stupid++; + + + long ducc_pid = idp.getDuccId().getFriendly(); + long pid = toInt(idp.getPID()); + String node = idp.getNodeIdentity().getName(); + String reason_agent = idp.getReasonForStoppingProcess(); // called "reason" in duccprocess but not in ws + String reason_scheduler = idp.getProcessDeallocationType().toString(); // called "processDeallocationType" in duccprocess but not in ws + int exit_code = idp.getProcessExitCode(); + long cpu = idp.getCurrentCPU(); + long swap = idp.getSwapUsageMax(); + + ITimeWindow itw = idp.getTimeWindowInit(); + long processStart = 0; + long initTime = 0; + if ( itw != null ) { + processStart = itw.getStartLong(); + initTime = itw.getElapsedMillis(); + } + itw = idp.getTimeWindowRun(); + long processEnd = 0; + if ( itw != null ) { + processEnd = idp.getTimeWindowRun().getEndLong(); + } + boolean initialized = idp.isInitialized(); + long investment = idp.getWiMillisInvestment(); + long major_faults = idp.getMajorFaults(); + long gccount = 0; + long gctime = 0; + ProcessGarbageCollectionStats gcs = idp.getGarbageCollectionStats(); + if ( gcs != null ) { + gccount = gcs.getCollectionCount(); + gctime = gcs.getCollectionTime(); + } + h.execute(processDetailsPrepare, node, job_id, ducc_pid, type, user, memory, processStart, processEnd, sclass, + pid, reason_agent, exit_code, reason_scheduler, cpu, swap, Math.max(0, (processEnd-processStart)), initTime, + initialized, investment, major_faults, gccount, gctime); + } + + } + break; + + case Reservation: + { + if ( job_id == 287249 ) { + stupid++; + stupid++; + } + + IDuccReservationMap m = ((IDuccWorkReservation)w).getReservationMap(); + Map<DuccId, IDuccReservation> map = m.getMap(); + IDuccStandardInfo dsi = w.getStandardInfo(); + IDuccSchedulingInfo dsx = w.getSchedulingInfo(); + long start = dsi.getDateOfCompletionMillis(); + long stop = dsi.getDateOfSubmissionMillis(); + int memory_size = 0; + if ( dsx.getMemorySizeRequested() == null ) { + memory_size = toInt(dsx.getMemorySizeRequested()); + } + for ( IDuccReservation idr : map.values() ) { + String node = "<none>"; + if ( idr.getNode() != null ) { + node = idr.getNode().getNodeIdentity().getName(); + } + try { + h.execute(reservationAllocPrepare, node, job_id, + idr.getDuccId().getFriendly(), type, dsi.getUser(), memory_size, + start, stop, dsx.getSchedulingClass(), Math.max(0, (stop-start)) ); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + stupid++; + stupid++; + } + } + + } + break; + } + } + + void summarizeReservation(DbHandle h, IDuccWork w) + throws Exception + { + DuccWorkReservation r = (DuccWorkReservation) w; // cannot use the interface because it is incomplete + + long ducc_dbid = r.getDuccId().getFriendly(); + + IDuccStandardInfo dsi = r.getStandardInfo(); + IDuccSchedulingInfo dsx = r.getSchedulingInfo(); + + String user = dsi.getUser(); + String jclass = dsx.getSchedulingClass(); + + int memory = toInt(dsx.getMemorySizeRequested()); + long submission = dsi.getDateOfSubmissionMillis(); + long completion = dsi.getDateOfCompletionMillis(); + long duration = Math.max(0, completion - submission); + String reason = getString(r.getCompletionType().toString()); + String description = getString(dsi.getDescription()); + + List<String> nodes = r.getNodes(); + int processes = nodes.size(); + StringBuffer buf = new StringBuffer(""); + for ( int i = 0; i < processes; i++ ) { + buf.append(nodes.get(i)); + if ( i < (processes-1) ) buf.append(" "); + } + String hosts = buf.toString(); + String type = "R"; + + String state = r.getReservationState().toString(); + + h.execute(reservationDetailsPrepare, user, jclass, ducc_dbid, submission, duration, memory, reason, processes, state, type, hosts, description); + + } void saveWork(PreparedStatement s, IDuccWork w, boolean isHistory) throws Exception @@ -176,26 +472,31 @@ public class HistoryManagerDb String methodName = "saveWork"; Long nowP = System.currentTimeMillis(); String type = null; + String processType = null; switch ( w.getDuccType() ) { - case Job: - type = "job"; - break; - case Service: - case Pop: + case Job: + type = "job"; + processType = "J"; + break; + case Service: + case Pop: switch ( ((IDuccWorkService)w).getServiceDeploymentType() ) { case uima: case custom: type = "service"; + processType = "S"; break; case other: type = "AP"; + processType = "A"; break; } break; case Reservation: type = "reservation"; + processType = "R"; break; default: // illegal - internal error if this happens @@ -214,6 +515,21 @@ public class HistoryManagerDb DbHandle h = dbManager.open(); h.saveObject(s, w.getDuccId().getFriendly(), type, isHistory, buf); + switch ( w.getDuccType() ) { + case Job: + summarizeJob(h, w, "J"); + break; + case Service: + case Pop: + break; + case Reservation: + break; + default: + break; // Can't get here, we'd abort above in this case + } + + summarizeProcesses(h, w, processType); // summarize each process of the work + logger.info(methodName, w.getDuccId(), "----------> Time to save", type, ":", System.currentTimeMillis() - nowP, "Size:", bytes.length, "bytes."); } @@ -293,7 +609,7 @@ public class HistoryManagerDb public void saveJob(IDuccWorkJob j) throws Exception { - saveWork(jobPrepare, j, true); + saveWork(jobBlobPrepare, j, true); } @@ -303,7 +619,7 @@ public class HistoryManagerDb public IDuccWorkJob restoreJob(long friendly_id) throws Exception { - return (IDuccWorkJob) restoreWork(IDuccWorkJob.class, JOB_TABLE, friendly_id); + return (IDuccWorkJob) restoreWork(IDuccWorkJob.class, JOB_HISTORY_TABLE, friendly_id); } /** @@ -312,7 +628,7 @@ public class HistoryManagerDb public ArrayList<IDuccWorkJob> restoreJobs(long max) throws Exception { - return restoreSeveralThings(IDuccWorkJob.class, JOB_TABLE, max); + return restoreSeveralThings(IDuccWorkJob.class, JOB_HISTORY_TABLE, max); } // End of jobs section // ---------------------------------------------------------------------------------------------------- @@ -325,7 +641,7 @@ public class HistoryManagerDb public void saveReservation(IDuccWorkReservation r) throws Exception { - saveWork(reservationPrepare, r, true); + saveWork(reservationBlobPrepare, r, true); } /** @@ -334,7 +650,7 @@ public class HistoryManagerDb public IDuccWorkReservation restoreReservation(long duccid) throws Exception { - return (IDuccWorkReservation) restoreWork(IDuccWorkReservation.class, RES_TABLE, duccid); + return (IDuccWorkReservation) restoreWork(IDuccWorkReservation.class, RES_HISTORY_TABLE, duccid); } /** @@ -343,7 +659,7 @@ public class HistoryManagerDb public ArrayList<IDuccWorkReservation> restoreReservations(long max) throws Exception { - return restoreSeveralThings(IDuccWorkReservation.class, RES_TABLE, max); + return restoreSeveralThings(IDuccWorkReservation.class, RES_HISTORY_TABLE, max); } // End of reservations section @@ -356,7 +672,7 @@ public class HistoryManagerDb public void saveService(IDuccWorkService s) throws Exception { - saveWork(servicePrepare, s, true); + saveWork(serviceBlobPrepare, s, true); } @@ -366,7 +682,7 @@ public class HistoryManagerDb public IDuccWorkService restoreService(long duccid) throws Exception { - return (IDuccWorkService) restoreWork(IDuccWorkService.class, SVC_TABLE, duccid); + return (IDuccWorkService) restoreWork(IDuccWorkService.class, SVC_HISTORY_TABLE, duccid); } /** @@ -375,7 +691,7 @@ public class HistoryManagerDb public ArrayList<IDuccWorkService> restoreServices(long max) throws Exception { - return restoreSeveralThings(IDuccWorkService.class, SVC_TABLE, max); + return restoreSeveralThings(IDuccWorkService.class, SVC_HISTORY_TABLE, max); } // End of services section // ---------------------------------------------------------------------------------------------------- @@ -504,17 +820,6 @@ public class HistoryManagerDb // End of OR checkpoint save and restore // ---------------------------------------------------------------------------------------------------- - // ---------------------------------------------------------------------------------------------------- - // Stuff common to everything - JsonObject mkJsonObject(String json) - { - // This method lets us munge the json before using it, if we need to - JsonParser parser = new JsonParser(); - JsonObject jobj = parser.parse(json).getAsJsonObject(); - - return jobj; - } - public void shutdown() { dbManager.shutdown(); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java?rev=1723398&r1=1723397&r2=1723398&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-orchestrator/src/main/java/org/apache/uima/ducc/orchestrator/StateManager.java Wed Jan 6 18:41:57 2016 @@ -228,6 +228,7 @@ public class StateManager { retVal = true; } catch(Exception e) { + e.printStackTrace(); } logger.trace(methodName, null, messages.fetch("exit")); return retVal; Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccSchedulingInfo.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccSchedulingInfo.java?rev=1723398&r1=1723397&r2=1723398&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccSchedulingInfo.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/DuccSchedulingInfo.java Wed Jan 6 18:41:57 2016 @@ -36,7 +36,7 @@ public class DuccSchedulingInfo implemen private String schedulingClass = defaultSchedulingClass; private String schedulingPriority = defaultSchedulingPriority; @Deprecated - private String memorySize = defaultMemorySize; + private String shareMemorySize = defaultMemorySize; private String memorySizeRequested = defaultMemorySize; private MemoryUnits memoryUnits = defaultMemoryUnits; private long memorySizeAllocatedInBytes = 0; @@ -146,8 +146,8 @@ public class DuccSchedulingInfo implemen @Deprecated public String getMemorySize() { String retVal = defaultMemorySize; - if(memorySize != null) { - retVal = memorySize; + if(shareMemorySize != null) { + retVal = shareMemorySize; } return retVal; } @@ -461,7 +461,7 @@ public class DuccSchedulingInfo implemen + ((schedulingPriority == null) ? 0 : schedulingPriority .hashCode()); result = prime * result - + ((memorySize == null) ? 0 : memorySize.hashCode()); + + ((shareMemorySize == null) ? 0 : shareMemorySize.hashCode()); result = prime * result + ((memoryUnits == null) ? 0 : memoryUnits.hashCode()); @@ -516,10 +516,10 @@ public class DuccSchedulingInfo implemen return false; } else if (!schedulingPriority.equals(other.schedulingPriority)) return false; - if (memorySize == null) { - if (other.memorySize != null) + if (shareMemorySize == null) { + if (other.shareMemorySize != null) return false; - } else if (!memorySize.equals(other.memorySize)) + } else if (!shareMemorySize.equals(other.shareMemorySize)) return false; if (memoryUnits != other.memoryUnits) return false; Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java?rev=1723398&r1=1723397&r2=1723398&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java Wed Jan 6 18:41:57 2016 @@ -62,19 +62,19 @@ public interface IHistoryPersistenceMana public enum OrWorkProps // properties for the OR work map implements IDbProperty { - JOB_TABLE { + JOB_HISTORY_TABLE { public String pname() { return "job_history"; } public boolean isPrivate() { return true; } public boolean isMeta() { return true; } }, - RESERVATION_TABLE { + RESERVATION_HISTORY_TABLE { public String pname() { return "res_history"; } public boolean isPrivate() { return true; } public boolean isMeta() { return true; } }, - SERVICE_TABLE { + SERVICE_HISTORY_TABLE { public String pname() { return "svc_history"; } public boolean isPrivate() { return true; } public boolean isMeta() { return true; } @@ -149,6 +149,409 @@ public interface IHistoryPersistenceMana }; + public enum OrProcessProps + implements IDbProperty + { + TABLE_NAME { + public String pname() { return "processes"; } + public boolean isPrivate() { return true; } + public boolean isMeta() { return true; } + }, + + + // The order of the primary keys is important here as the Db assigns semantics to the first key in a compound PK + job_id { + public String pname() { return "job_id"; } + public Type type() { return Type.Long; } + public boolean isPrimaryKey() { return true; } + }, + + ducc_pid { + public String pname() { return "ducc_pid"; } + public Type type() { return Type.Long; } + public boolean isPrimaryKey() { return true; } + }, + + host { + public String pname() { return "host"; } + public Type type() { return Type.String; } + public boolean isIndex() { return true; } + }, + + + user { + public String pname() { return "user"; } + public Type type() { return Type.String; } + public boolean isIndex() { return true; } + }, + + log { + public String pname() { return "log"; } + public Type type() { return Type.String; } + public boolean isIndex() { return true; } + }, + + sclass { + public String pname() { return "class"; } + public Type type() { return Type.String; } + }, + + pid { + public String pname() { return "pid"; } + public Type type() { return Type.Long; } + }, + + start { + public String pname() { return "start"; } + public Type type() { return Type.Long; } + public boolean isIndex() { return true; } + }, + + stop { + public String pname() { return "stop"; } + public Type type() { return Type.Long; } + public boolean isIndex() { return true; } + }, + + reason_scheduler { + public String pname() { return "reason_scheduler"; } + public Type type() { return Type.String; } + }, + + reason_agent { + public String pname() { return "reason_agent"; } + public Type type() { return Type.String; } + }, + + exit_code { + public String pname() { return "exit_code"; } + public Type type() { return Type.Integer; } + }, + + deallocation_type { + public String pname() { return "deallocation_type"; } + public Type type() { return Type.String; } + }, + + memory { + public String pname() { return "memory"; } + public Type type() { return Type.Integer; } + }, + + cpu { + public String pname() { return "cpu"; }; + public Type type() { return Type.Long; } + }, + + swap_max { + public String pname() { return "swap_max"; }; + public Type type() { return Type.Long; } + }, + + state_scheduler { + public String pname() { return "state_scheduler"; } + public Type type() { return Type.String; } + }, + + state_agent { + public String pname() { return "state_agent"; } + public Type type() { return Type.String; } + }, + + + run_time { + public String pname() { return "run_time"; }; + public Type type() { return Type.Long; } + }, + + initialized { + public String pname() { return "initialized"; }; + public Type type() { return Type.Boolean; } + }, + + init_time { + public String pname() { return "init_time"; }; + public Type type() { return Type.Long; } + }, + + type { + public String pname() { return "type"; } // J, R, S, A + public boolean isIndex() { return true; } + }, + + major_faults { + public String pname() { return "major_faults"; } + public Type type() { return Type.Long; } + }, + + investment { + public String pname() { return "investment"; } + public Type type() { return Type.Long; } + }, + + gccount { + public String pname() { return "gc_count"; } + public Type type() { return Type.Long; } + }, + + gctime { + public String pname() { return "gc_time"; } + public Type type() { return Type.Long; } + }, + + jconsole { + public String pname() { return "jconsole"; } + public Type type() { return Type.Long; } + }, + + ; + public Type type() { return Type.String; } + public boolean isPrimaryKey() { return false; } + public boolean isPrivate() { return false; } + public boolean isMeta() { return false; } + public boolean isIndex() { return false; } + public String columnName() { return pname(); } + + }; + + // jobs - only history so far. some fields not used in db but here for the sake of complete schema + public enum OrJobProps + implements IDbProperty + { + TABLE_NAME { + public String pname() { return "jobs"; } + public boolean isPrivate() { return true; } + public boolean isMeta() { return true; } + }, + + + // The order of the primary keys is important here as the Db assigns semantics to the first key in a compound PK + user { + public String pname() { return "user"; } + public Type type() { return Type.String; } + public boolean isPrimaryKey() { return true; } + }, + + jclass { + public String pname() { return "class"; } + public Type type() { return Type.String; } + public boolean isPrimaryKey() { return true; } + }, + + ducc_dbid { + public String pname() { return "ducc_dbid"; } + public Type type() { return Type.Long; } + public boolean isPrimaryKey() { return true; } + }, + + submission_time { + public String pname() { return "submission_time"; } + public Type type() { return Type.Long; } + public boolean isIndex() { return true; } + }, + + duration { + public String pname() { return "duration"; } + public Type type() { return Type.Long; } + public boolean isIndex() { return true; } + }, + + memory { + public String pname() { return "memory"; } + public Type type() { return Type.Integer; } + public boolean isIndex() { return true; } + }, + + services { + public String pname() { return "services"; } + public Type type() { return Type.Integer; } + public boolean isIndex() { return true; } + }, + + processes { + public String pname() { return "processes"; } + public Type type() { return Type.Integer; } + public boolean isIndex() { return true; } + }, + + reason { + public String pname() { return "reason"; } + public Type type() { return Type.String; } + public boolean isIndex() { return true; } + }, + + init_fails { + public String pname() { return "init_fails"; } + public Type type() { return Type.Integer; } + public boolean isIndex() { return true; } + }, + + run_fails { + public String pname() { return "run_fails"; } + public Type type() { return Type.Integer; } + public boolean isIndex() { return true; } + }, + + errors { + public String pname() { return "errors"; } + public Type type() { return Type.Integer; } + public boolean isIndex() { return true; } + }, + + state { + public String pname() { return "state"; } + public Type type() { return Type.String; } + public boolean isIndex() { return true; } + }, + + pgin { + public String pname() { return "pgin"; } + public Type type() { return Type.Long; } + public boolean isIndex() { return true; } + }, + + swap { + public String pname() { return "swap"; } + public Type type() { return Type.Long; } + public boolean isIndex() { return true; } + }, + + total_wi { + public String pname() { return "total_wi"; } + public Type type() { return Type.Integer; } + public boolean isIndex() { return true; } + }, + + done_wi { + public String pname() { return "done_wi"; } + public Type type() { return Type.Integer; } + public boolean isIndex() { return true; } + }, + + dispatch { + public String pname() { return "dispatch"; } + public Type type() { return Type.Integer; } + public boolean isIndex() { return true; } + }, + + retries { + public String pname() { return "retries"; } + public Type type() { return Type.Integer; } + public boolean isIndex() { return true; } + }, + + preemptions { + public String pname() { return "preemptions"; } + public Type type() { return Type.Integer; } + public boolean isIndex() { return true; } + }, + + description { + public String pname() { return "description"; } + public Type type() { return Type.String; } + public boolean isIndex() { return true; } + }, + ; + public Type type() { return Type.String; } + public boolean isPrimaryKey() { return false; } + public boolean isPrivate() { return false; } + public boolean isMeta() { return false; } + public boolean isIndex() { return false; } + public String columnName() { return pname(); } + + }; + + // reservations - only history so far. some fields not used in db but here for the sake of complete schema + public enum OrReservationProps + implements IDbProperty + { + TABLE_NAME { + public String pname() { return "reservations"; } + public boolean isPrivate() { return true; } + public boolean isMeta() { return true; } + }, + + + // The order of the primary keys is important here as the Db assigns semantics to the first key in a compound PK + user { + public String pname() { return "user"; } + public Type type() { return Type.String; } + public boolean isPrimaryKey() { return true; } + }, + + jclass { + public String pname() { return "class"; } + public Type type() { return Type.String; } + public boolean isPrimaryKey() { return true; } + }, + + ducc_dbid { + public String pname() { return "ducc_dbid"; } + public Type type() { return Type.Long; } + public boolean isPrimaryKey() { return true; } + }, + + submission_time { + public String pname() { return "submission_time"; } + public Type type() { return Type.Long; } + public boolean isIndex() { return true; } + }, + + duration { + public String pname() { return "duration"; } + public Type type() { return Type.Long; } + public boolean isIndex() { return true; } + }, + + memory { + public String pname() { return "memory"; } + public Type type() { return Type.Integer; } + public boolean isIndex() { return true; } + }, + + reason { + public String pname() { return "reason"; } + public Type type() { return Type.String; } + public boolean isIndex() { return true; } + }, + + processes { + public String pname() { return "processes"; } + public Type type() { return Type.Integer; } + }, + + state { + public String pname() { return "state"; } + public Type type() { return Type.String; } + public boolean isIndex() { return true; } + }, + + type { + public String pname() { return "type"; } + public Type type() { return Type.String; } + public boolean isIndex() { return true; } + }, + + hosts { + public String pname() { return "hosts"; } + public Type type() { return Type.String; } + }, + + description { + public String pname() { return "description"; } + public Type type() { return Type.String; } + public boolean isIndex() { return true; } + }, + ; + public Type type() { return Type.String; } + public boolean isPrimaryKey() { return false; } + public boolean isPrivate() { return false; } + public boolean isMeta() { return false; } + public boolean isIndex() { return false; } + public String columnName() { return pname(); } + + }; + //public void serviceSaveConditional(IDuccWorkService duccWorkService) throws Exception; // public void serviceSave(IDuccWorkService duccWorkService) throws Exception; //public IDuccWorkService serviceRestore(String fileName);