Author: challngr Date: Mon Nov 9 11:34:50 2015 New Revision: 1713395 URL: http://svn.apache.org/viewvc?rev=1713395&view=rev Log: UIMA-4755 Make the RM node status availd to applications.
Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmNodeState.java Modified: uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_util.py uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/IDbProperty.java uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/RmPersistenceFactory.java uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbAlive.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java Modified: uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_util.py URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_util.py?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_util.py (original) +++ uima/sandbox/uima-ducc/trunk/src/main/admin/ducc_util.py Mon Nov 9 11:34:50 2015 @@ -162,6 +162,7 @@ class DuccUtil(DuccBase): def db_configure(self): dbhost = self.ducc_properties.get('ducc.database.host') + print '----- dbhost', dbhost if ( dbhost == self.db_disabled ): self.db_bypass = True return; @@ -189,12 +190,12 @@ class DuccUtil(DuccBase): if ( self.system == 'Darwin'): ps = 'ps -eo user,pid,comm,args ' + pid else: - ps = 'ps -eo user:14,pid,comm,args ' + pkd + ps = 'ps -eo user:14,pid,comm,args ' + pid lines = self.popen(ps) for line in lines: line = line.strip() - if (pid in line): + if (pid in line and 'cassandra' in line): return True return False @@ -242,7 +243,7 @@ class DuccUtil(DuccBase): line = lines.readline().strip() except: break - # print '[]', line + #print '[]', line if ( not line ): break Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/IDbProperty.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/IDbProperty.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/IDbProperty.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/IDbProperty.java Mon Nov 9 11:34:50 2015 @@ -39,8 +39,9 @@ public interface IDbProperty // than does SQL, we define a translation from the "properties" key to // legal SQL syntactic names. DB does not translate, user must provide // a suitable translation. + boolean isIndex(); // If true, create a secondary index. - // If we update this we may have to update db methods that use it + // If we update this we may have to update db methods that use it. We'll try to avoid collections for now. public enum Type { String, // Java String Blob, // Java serialized object or other binary Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/IRmPersistence.java Mon Nov 9 11:34:50 2015 @@ -24,6 +24,7 @@ import java.util.Properties; import org.apache.uima.ducc.common.persistence.IDbProperty; import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.common.utils.id.DuccId; public interface IRmPersistence { @@ -51,7 +52,7 @@ public interface IRmPersistence * @param id This is the primary key, the machine name; * @param properties These are the props, must be presented in the form of (String, Object) ... */ - public void setProperties(String id, Object... properties) throws Exception; + public void setNodeProperties(String id, Object... properties) throws Exception; /** * Set a property on an object. If the property cannot be set the action @@ -65,7 +66,7 @@ public interface IRmPersistence * throw will originate in the DB because of some DB issue. An * exception causes the action to be rolled back. */ - public void setProperty(String id, RmProperty key, Object value) throws Exception; + public void setNodeProperty(String id, RmNodes key, Object value) throws Exception; /** @@ -83,7 +84,23 @@ public interface IRmPersistence * * @return The db id of the created machine. */ - public void createMachine(String id, Map<RmProperty, Object> props) throws Exception; + public void createMachine(String id, Map<RmNodes, Object> props) throws Exception; + + /** + * Assign a share to this machine. + * @param id The node name + * @param jobid The duccid of the job owning the new shoare + * @param shareid The DuccId of the new share. + */ + public void addAssignment(String id, DuccId jobid, DuccId shareid); + + /** + * Remove a share from the machine. + * @param id The node name + * @param jobid The duccid of the job owning the new shoare + * @param shareid The DuccId of the new share. + */ + public void removeAssignment(String id, DuccId jobid, DuccId shareid); /** * Fetch a machine by its id. @@ -109,10 +126,15 @@ public interface IRmPersistence * @throws Exception. Anything that goes wrong throws. Usually the * throw will originate in the DB because of some DB issue. */ - public Map<String, Properties> getAllMachines() throws Exception; - + public Map<String, Map<String, Object>> getAllMachines() throws Exception; - enum RmProperty + /** + * Shutdown the connection to the DB; + * + */ + public void close(); + + enum RmNodes implements IDbProperty { TABLE_NAME { @@ -121,33 +143,37 @@ public interface IRmPersistence public boolean isPrivate() { return true;} public boolean isMeta() { return true;} }, - Nodepool { - public String pname() { return "nodepool"; } + Name { + public String pname() { return "name"; } public Type type() { return Type.String; } public boolean isPrimaryKey() { return true;} }, Memory { public String pname() { return "memory"; } public Type type() { return Type.Integer; } - public boolean isPrimaryKey() { return true;} }, - Name { - public String pname() { return "name"; } + Nodepool { + public String pname() { return "nodepool"; } public Type type() { return Type.String; } - public boolean isPrimaryKey() { return true;} }, SharesLeft { public String pname() { return "shares_left"; } public Type type() { return Type.Integer; } - public boolean isPrimaryKey() { return true;} }, Responsive{ public String pname() { return "responsive"; } public Type type() { return Type.Boolean; } + public boolean isIndex() { return true; } }, Online{ public String pname() { return "online"; } public Type type() { return Type.Boolean; } + public boolean isIndex() { return true; } + }, + Reservable{ + public String pname() { return "reservable"; } + public Type type() { return Type.Boolean; } + public boolean isIndex() { return true; } }, Ip { public String pname() { return "ip"; } @@ -157,14 +183,14 @@ public interface IRmPersistence public String pname() { return "quantum"; } public Type type() { return Type.Integer; } }, + Classes { + public String pname() { return "classes"; } + public Type type() { return Type.String; } + }, ShareOrder { public String pname() { return "share_order"; } public Type type() { return Type.Integer; } }, - Shares{ - public String pname() { return "shares"; } - public Type type() { return Type.Integer; } - }, Blacklisted { public String pname() { return "blacklisted"; } public Type type() { return Type.Boolean; } @@ -181,6 +207,7 @@ public interface IRmPersistence public boolean isPrimaryKey() { return false; } public boolean isPrivate() { return false; } public boolean isMeta() { return false; } - public String columnName() { return pname(); } + public String columnName() { return pname(); } + public boolean isIndex() { return false; } } } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/NullRmStatePersistence.java Mon Nov 9 11:34:50 2015 @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Properties; import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.common.utils.id.DuccId; public class NullRmStatePersistence implements IRmPersistence @@ -33,9 +34,12 @@ public class NullRmStatePersistence impl public void init(DuccLogger logger) throws Exception { } public void clear() {} - public void setProperty(String id, RmProperty key, Object value) { } - public void setProperties(String id, Object... props) {} - public void createMachine(String id, Map<RmProperty, Object> props) { } + public void close() {} + public void setNodeProperty(String id, RmNodes key, Object value) { } + public void setNodeProperties(String id, Object... props) {} + public void createMachine(String id, Map<RmNodes, Object> props) { } + public void addAssignment(String id, DuccId jobid, DuccId shareid) {} + public void removeAssignment(String id, DuccId jobid, DuccId shareid) {} public Properties getMachine(String id) { return null; } - public Map<String, Properties> getAllMachines() { return new HashMap<String, Properties>(); } + public Map<String, Map<String, Object>> getAllMachines() { return new HashMap<String, Map<String, Object>>(); } } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/RmPersistenceFactory.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/RmPersistenceFactory.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/RmPersistenceFactory.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/rm/RmPersistenceFactory.java Mon Nov 9 11:34:50 2015 @@ -40,7 +40,12 @@ public class RmPersistenceFactory String clname = System.getProperty("ducc.rm.persistence.impl"); if ( clname == null ) { DuccLogger logger = DuccService.getDuccLogger(); - logger.warn(methodName, null, "RM persistence manager is not configured. Returning null instance."); + if ( logger == null ) { + //Can happen when called from command-line utilities + System.out.println("RM persistence manager is not configured. Returning null instance."); + } else { + logger.warn(methodName, null, "RM persistence manager is not configured. Returning null instance."); + } return new NullRmStatePersistence(); } ndx = clname.lastIndexOf("."); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-common/src/main/java/org/apache/uima/ducc/common/persistence/services/IStateServices.java Mon Nov 9 11:34:50 2015 @@ -111,6 +111,7 @@ public interface IStateServices { public boolean isPrimaryKey() { return false; } public boolean isPrivate() { return false; } public boolean isMeta() { return false; } + public boolean isIndex() { return false; } public String columnName() { return pname(); } }; @@ -244,9 +245,9 @@ public interface IStateServices { public boolean isPrimaryKey() { return false; } public boolean isPrivate() { return false; } public boolean isMeta() { return false; } + public boolean isIndex() { return false; } public String columnName() { return pname(); } - }; // Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbAlive.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbAlive.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbAlive.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbAlive.java Mon Nov 9 11:34:50 2015 @@ -86,7 +86,6 @@ public class DbAlive Metadata metadata = cluster.getMetadata(); System.out.println("Connected to cluster: " + metadata.getClusterName()); - String x = DbCreate.DUCC_KEYSPACE; KeyspaceMetadata duccKs = metadata.getKeyspace(DbCreate.DUCC_KEYSPACE); if ( duccKs == null ) { System.out.println("DUCC keyspace not found."); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbCreate.java Mon Nov 9 11:34:50 2015 @@ -19,9 +19,7 @@ package org.apache.uima.ducc.database; -import java.io.FileOutputStream; import java.util.List; -import java.util.Properties; import java.util.UUID; import org.apache.uima.ducc.common.utils.DuccLogger; @@ -111,7 +109,7 @@ public class DbCreate doLog(methodName, "Changed default super user's password and revoked its superuser authority."); doLog(methodName, "From this point, this DB can only be accessed in super user mode by user 'ducc'"); - return true; + break; } catch ( NoHostAvailableException e ) { doLog("Waiting for database to boot ..."); session = null; @@ -186,6 +184,7 @@ public class DbCreate // A 'keyspace' is what we usually think of as a database. session.execute("CREATE KEYSPACE IF NOT EXISTS ducc WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};"); + session.execute("USE " + DUCC_KEYSPACE); try { List<SimpleStatement>rmSchema = RmStatePersistence.mkSchema(); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbManager.java Mon Nov 9 11:34:50 2015 @@ -74,6 +74,7 @@ public class DbManager { if ( session == null ) { session = cluster.connect(); + session.execute(new SimpleStatement("USE " + DbCreate.DUCC_KEYSPACE)); } return new DbHandle(this); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/DbUtil.java Mon Nov 9 11:34:50 2015 @@ -68,6 +68,26 @@ class DbUtil return buf.toString(); } + static List<String> mkIndices(IDbProperty[] props, String tablename) + { + List<String> ret = new ArrayList<String>(); + for ( IDbProperty p : props ) { + if ( p.isIndex() ) { + StringBuffer buf = new StringBuffer("CREATE INDEX "); + buf.append(tablename); + buf.append("_"); + buf.append(p.pname()); + buf.append("_idx ON "); + buf.append(tablename); + buf.append("("); + buf.append(p.pname()); + buf.append(")"); + ret.add(buf.toString()); + } + } + return ret; + } + static String mkFields(StringBuffer buf, String[] fields) { int max = fields.length - 1; @@ -120,7 +140,7 @@ class DbUtil String k = ok.columnName(); buf.append(k); - vals.append(rep(ok.type(), props.get(ok))); + vals.append(rep(ok, props.get(ok))); if ( current++ < max ) { buf.append(","); @@ -152,7 +172,7 @@ class DbUtil String k = ok.columnName(); buf.append(k); - vals.append(rep(ok.type(), props.get(ok))); + vals.append(rep(ok, props.get(ok))); if ( current++ < max ) { buf.append(","); @@ -182,7 +202,7 @@ class DbUtil buf.append(prop.columnName()); buf.append("="); - buf.append(rep(prop.type(), props[i+1])); + buf.append(rep(prop, props[i+1])); if ( i + 2 < len ) { buf.append(","); } @@ -196,15 +216,14 @@ class DbUtil /** * Return the correct representation for CQL update, of val, for the indicated type, for this database. */ - static String rep(Type t, Object val) + static String rep(IDbProperty p, Object val) { - switch ( t ) { - case String: - return "'" + val.toString() + "'"; - default: - return val.toString(); + switch ( p.type() ) { + case String: + return "'" + val.toString() + "'"; + default: + return val.toString(); } - } /** @@ -218,7 +237,7 @@ class DbUtil case Blob: return "blob"; case String: - return "text"; + return "varchar"; case Boolean: return "boolean"; case Integer: Added: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmNodeState.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmNodeState.java?rev=1713395&view=auto ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmNodeState.java (added) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmNodeState.java Mon Nov 9 11:34:50 2015 @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. +*/ + +package org.apache.uima.ducc.database; + +import java.util.Map; + +import org.apache.uima.ducc.common.persistence.rm.IRmPersistence; +import org.apache.uima.ducc.common.persistence.rm.RmPersistenceFactory; +import org.apache.uima.ducc.common.utils.DuccLogger; + +public class RmNodeState +{ + DuccLogger logger = DuccLogger.getLogger(RmNodeState.class, "State"); + String dburl = null; + + RmNodeState(String dburl) + { + this.dburl = dburl; + } + + void run() + throws Exception + { + IRmPersistence persistence = RmPersistenceFactory.getInstance(this.getClass().getName(), "RM"); + + try { + Map<String, Map<String, Object>> state = persistence.getAllMachines(); + for ( String node : state.keySet() ) { + StringBuffer buf = new StringBuffer(node); + buf.append(": "); + Map<String, Object> st = state.get(node); + for ( String k : st.keySet() ) { + buf.append(k); + buf.append("["); + buf.append(st.get(k).toString()); + buf.append("] "); + } + System.out.println(buf.toString()); + } + } finally { + persistence.close(); + } + } + + public static void main(String[] args) + { + if ( args.length != 1 ) { + System.out.println("Usage: RmNodeState <dburl>"); + System.exit(1); + } + System.setProperty("ducc.state.database.url", args[0]); + + RmNodeState rns = new RmNodeState(args[0]); + try { + rns.run(); + } catch ( Exception e ) { + e.printStackTrace(); + } + } +} Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-database/src/main/java/org/apache/uima/ducc/database/RmStatePersistence.java Mon Nov 9 11:34:50 2015 @@ -27,7 +27,10 @@ import java.util.Properties; import org.apache.uima.ducc.common.persistence.rm.IRmPersistence; import org.apache.uima.ducc.common.utils.DuccLogger; +import org.apache.uima.ducc.common.utils.id.DuccId; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; import com.datastax.driver.core.SimpleStatement; /** @@ -40,6 +43,8 @@ public class RmStatePersistence DbManager dbManager = null; DuccLogger logger = null; + static final String RM_NODE_TABLE = RmNodes.TABLE_NAME.pname(); + public RmStatePersistence() { } @@ -66,9 +71,10 @@ public class RmStatePersistence init(stateUrl); } - public void shutdown() + public void close() { dbManager.shutdown(); + dbManager = null; } public void clear() @@ -78,7 +84,7 @@ public class RmStatePersistence DbHandle h = null; try { h = dbManager.open(); - h.execute("TRUNCATE ducc.rmnodes"); + h.execute("TRUNCATE " + RM_NODE_TABLE); } catch ( Exception e ) { logger.error(methodName, null, "Cannot clear the database.", e); } @@ -88,46 +94,32 @@ public class RmStatePersistence throws Exception { List<SimpleStatement> ret = new ArrayList<SimpleStatement>(); - - StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS ducc." + RmProperty.TABLE_NAME.pname() + " ("); - buf.append(DbUtil.mkSchema(RmProperty.values())); - buf.append(") WITH CLUSTERING ORDER BY (memory desc, name asc, shares_left desc)"); - + + StringBuffer buf = new StringBuffer("CREATE TABLE IF NOT EXISTS " + RM_NODE_TABLE + " ("); + buf.append(DbUtil.mkSchema(RmNodes.values())); + buf.append(")"); ret.add(new SimpleStatement(buf.toString())); + List<String> indexes = DbUtil.mkIndices(RmNodes.values(), RM_NODE_TABLE); + for ( String s : indexes ) { + ret.add(new SimpleStatement(s)); + } return ret; } - // static String[] mkSchemaItems() - // { - // int size = RmProperty.values().length; - // String[] ret = new String[size]; - // int ndx = 0; - - // for ( RmProperty n: RmProperty.values() ) { - // String s = n.pname(); - // s = s + " " + DbUtil.typeToString(n.type()); - // if ( n.isPrimaryKey() ) { - // s = s + " PRIMARY KEY"; - // } - // ret[ndx++] = s; - // } - // return ret; - // } - - public void createMachine(String m, Map<RmProperty, Object> props) + public void createMachine(String m, Map<RmNodes, Object> props) throws Exception { String methodName = "createMachine"; DbHandle h = dbManager.open(); try { - String cql = DbUtil.mkInsert("ducc.rmnodes", props); + String cql = DbUtil.mkInsert(RM_NODE_TABLE, props); h.execute(cql); } catch ( Exception e ) { logger.error(methodName, null, "Error creating new record:", e); } } - public void setProperties(String node, Object... props) + public void setNodeProperties(String node, Object... props) throws Exception { String methodName = "setProperties"; @@ -140,9 +132,9 @@ public class RmStatePersistence DbHandle h = dbManager.open(); try { - h.updateProperties("ducc.rmnodes", "WHERE name='" + node + "'", props); + h.updateProperties(RM_NODE_TABLE, "name='" + node + "'", props); } catch ( Exception e ) { - logger.error(methodName, null, "Problem setting properties"); + logger.error(methodName, null, "Problem setting properties", e); } finally { logger.info(methodName, null, "Total time to update properties on", System.currentTimeMillis() - now); @@ -150,7 +142,7 @@ public class RmStatePersistence } - public void setProperty(String node, RmProperty k, Object v) + public void setNodeProperty(String node, RmNodes k, Object v) throws Exception { String methodName = "setProperty"; @@ -158,23 +150,71 @@ public class RmStatePersistence DbHandle h = dbManager.open(); try { - h.updateProperty("ducc.rmnodes", "name='" + node + "'", k.columnName(), v); + h.updateProperty(RM_NODE_TABLE, "name='" + node + "'", k.columnName(), v); } catch ( Exception e ) { - logger.error(methodName, null, "Problem setting properties."); + logger.error(methodName, null, "Problem setting properties:", e); } } - + + public void addAssignment(String node, DuccId jobid, DuccId shareid) + { + } + + public void removeAssignment(String node, DuccId jobid, DuccId shareid) + { + } + public Properties getMachine(String m) throws Exception { return null; } - public Map<String, Properties> getAllMachines() + public Map<String, Map<String, Object>> getAllMachines() throws Exception - { - return new HashMap<String, Properties>(); + { + String methodName = "getAllMachiens"; + Map<String, Map<String, Object>> ret = new HashMap<String, Map<String, Object>>(); + String cql = "SELECT * FROM " + RM_NODE_TABLE; + DbHandle h = dbManager.open(); + ResultSet rs = h.execute(cql); + for ( Row r : rs ) { + Map<String, Object> mach = new HashMap<String, Object>(); + // We don't expect any nulls in this table + for ( RmNodes n : RmNodes.values() ) { + if ( n.isPrivate() ) continue; + if ( n.isMeta() ) continue; + switch ( n.type() ) { + + case String: { + String v = r.getString(n.columnName()); + mach.put(n.pname(), v); + if ( n == RmNodes.Name ) { + ret.put(v, mach); + } + } + break; + + case Integer: { + int v = r.getInt(n.columnName()); + mach.put(n.pname(), v); + } + break; + + case Boolean: { + boolean v = r.getBool(n.columnName()); + mach.put(n.pname(), v); + } + break; + + default: + logger.warn(methodName, null, "Unexpected value in db:", n.pname(), "type", n.type(), "is not recognized."); + break; + } + } + } + return ret; } public static void main(String[] args) Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Machine.java Mon Nov 9 11:34:50 2015 @@ -26,7 +26,7 @@ import org.apache.uima.ducc.common.NodeI import org.apache.uima.ducc.common.admin.event.RmQueriedMachine; import org.apache.uima.ducc.common.admin.event.RmQueriedShare; import org.apache.uima.ducc.common.persistence.rm.IRmPersistence; -import org.apache.uima.ducc.common.persistence.rm.IRmPersistence.RmProperty; +import org.apache.uima.ducc.common.persistence.rm.IRmPersistence.RmNodes; import org.apache.uima.ducc.common.persistence.rm.RmPersistenceFactory; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.common.utils.id.DuccId; @@ -171,10 +171,11 @@ public class Machine { String methodName = "heartbeatArrives"; long now = System.currentTimeMillis(); - if ( heartbeats == 0 ) return; // no need to rereset it + if ( heartbeats == 0 ) return; + heartbeats = 0; try { - logger.info(methodName, null, "Reset heartbeat to 0"); - persistence.setProperty(id, RmProperty.Heartbeats, 0); + logger.info(methodName, null, "Reset heartbeat to 0 from", heartbeats); + persistence.setNodeProperty(id, RmNodes.Heartbeats, 0); logger.info(methodName, null, "Time to reset heartbeat", System.currentTimeMillis() - now); } catch (Exception e) { logger.warn(methodName, null, "Cannot update heartbeat count in database:", e); @@ -187,10 +188,10 @@ public class Machine long now = System.currentTimeMillis(); if ( c < 2 ) return; // we allow a couple because timing and races can create false negatives + heartbeats = c; try { - heartbeats = c; logger.info(methodName, null, "Missed heartbeat count", c); - persistence.setProperty(id, RmProperty.Heartbeats, c); + persistence.setNodeProperty(id, RmNodes.Heartbeats, c); logger.info(methodName, null, "Time to record misssed heartbeat", System.currentTimeMillis() - now); } catch (Exception e) { logger.warn(methodName, null, "Cannot update heartbeat count in database:", e); @@ -317,7 +318,9 @@ public class Machine activeShares.put(s, s); shares_left -= s.getShareOrder(); try { - persistence.setProperties(id, RmProperty.Assignments, activeShares.size(), RmProperty.SharesLeft, shares_left); + // Not transactional. If this turns into a problem we'll have to find a way + persistence.setNodeProperties(id, RmNodes.Assignments, activeShares.size(), RmNodes.SharesLeft, shares_left); + persistence.addAssignment(id, s.getJob().getId(), s.getId()); // update jobs on machine and specific shares logger.info(methodName, null, "Time to assign share in db", System.currentTimeMillis() - now); } catch (Exception e) { logger.warn(methodName, null, "Cannot save state; shares_left", shares_left); @@ -334,7 +337,9 @@ public class Machine nodepool.removeShare(s); shares_left += s.getShareOrder(); try { - persistence.setProperties(id, RmProperty.Assignments, activeShares.size(), RmProperty.SharesLeft, shares_left); + // Not transactional. If this turns into a problem we'll have to find a way + persistence.setNodeProperties(id, RmNodes.Assignments, activeShares.size(), RmNodes.SharesLeft, shares_left); + persistence.removeAssignment(id, s.getJob().getId(), s.getId()); // update jobs on machine and specific shares logger.info(methodName, null, "Time to remove share in db", System.currentTimeMillis() - now); } catch (Exception e) { logger.warn(methodName, null, "Cannot save state; shares_left", shares_left); Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/NodePool.java Mon Nov 9 11:34:50 2015 @@ -31,7 +31,7 @@ import java.util.Map; import org.apache.uima.ducc.common.Node; import org.apache.uima.ducc.common.NodeIdentity; import org.apache.uima.ducc.common.persistence.rm.IRmPersistence; -import org.apache.uima.ducc.common.persistence.rm.IRmPersistence.RmProperty; +import org.apache.uima.ducc.common.persistence.rm.IRmPersistence.RmNodes; import org.apache.uima.ducc.common.persistence.rm.RmPersistenceFactory; import org.apache.uima.ducc.common.utils.DuccLogger; import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType; @@ -69,7 +69,7 @@ class NodePool HashMap<Node, Machine> preemptables = new HashMap<Node, Machine>(); // candidates for preemption for reservations int total_shares = 0; - Map<ResourceClass, ResourceClass> allClasses = new HashMap<ResourceClass, ResourceClass>(); // all the classes directly serviced by me + Map<ResourceClass, ResourceClass> allClasses = new HashMap<ResourceClass, ResourceClass>(); // all the classes directly serviced by me // // There are "theoretical" shares based on actual capacities of // the machines. They are used for the "how much" part of the @@ -101,6 +101,7 @@ class NodePool GlobalOrder maxorder = null; IRmPersistence persistence = null; + boolean canReserve = false; // if we contain a class with policy Reserve, then stuff in this pool is reservable // NodePool(NodePool parent, String id, EvictionPolicy ep, int order) // { @@ -138,6 +139,7 @@ class NodePool void addResourceClass(ResourceClass cl) { // UIMA-4065 allClasses.put(cl, cl); + if ( cl.getPolicy() == Policy.RESERVE) canReserve = true; } NodePool getParent() @@ -994,87 +996,47 @@ class NodePool } - void signalDb(Machine m, RmProperty key, Object value) + void signalDb(Machine m, RmNodes key, Object value) { String methodName = "signalDb"; try { - persistence.setProperty(m.getNode().getNodeIdentity().getName(), key, value); + persistence.setNodeProperty(m.getNode().getNodeIdentity().getName(), key, value); } catch (Exception e) { logger.warn(methodName, null, "Cannot update DB property", key, "for machine", m); } } - Map<RmProperty, Object> initDbProperties(Machine m) + Map<RmNodes, Object> initDbProperties(Machine m) { Node n = m.getNode(); NodeIdentity nid = n.getNodeIdentity(); - Map<RmProperty, Object> props = new HashMap<RmProperty, Object>(); - props.put(RmProperty.Name, nid.getName()); - props.put(RmProperty.Ip, nid.getIp()); - props.put(RmProperty.Nodepool, id); - props.put(RmProperty.Quantum, share_quantum / ( 1024*1024)); + Map<RmNodes, Object> props = new HashMap<RmNodes, Object>(); + props.put(RmNodes.Name, nid.getName()); + props.put(RmNodes.Ip, nid.getIp()); + props.put(RmNodes.Nodepool, id); + props.put(RmNodes.Quantum, share_quantum / ( 1024*1024)); - props.put(RmProperty.Memory , m.getMemory() / (1024*1024)); - props.put(RmProperty.ShareOrder , m.getShareOrder()); - props.put(RmProperty.Blacklisted , m.isBlacklisted()); + props.put(RmNodes.Memory , m.getMemory() / (1024*1024)); + props.put(RmNodes.ShareOrder , m.getShareOrder()); + props.put(RmNodes.Blacklisted , m.isBlacklisted()); // init these here, but must be maintained by machine - props.put(RmProperty.Heartbeats , 0); - props.put(RmProperty.SharesLeft , m.countFreeShares()); // qshares remaining - props.put(RmProperty.Assignments , m.countProcesses()); // processes + props.put(RmNodes.Heartbeats , 0); + props.put(RmNodes.SharesLeft , m.countFreeShares()); // qshares remaining + props.put(RmNodes.Assignments , m.countProcesses()); // processes + props.put(RmNodes.Reservable , canReserve); + + StringBuffer buf = new StringBuffer(); + for ( ResourceClass cl : allClasses.keySet() ) { + buf.append(cl.getName()); + buf.append(" "); + } + props.put(RmNodes.Classes, buf.toString()); return props; } - // /** - // * On init, seed the database with everything we know about nodes. - // * TODO: not used - do we care? - // */ - // void initializeDbx() - // { - // String methodName = "initializeDb"; - // for ( NodePool np : children.values() ) { - // np.initializeDbx(); - // } - - // for (Node n : allMachines.keySet()) { - // Machine m = allMachines.get(n); - // Properties props = initDbProperties(m); - // props.put(RmPropName.Responsive.pname(), true); - // props.put(RmPropName.Online.pname(), true); - // try { - // persistence.createMachine(m.getId(), props); - // } catch (Exception e) { - // logger.warn(methodName, null, "Cannot store (online) node", m.getId(), "in db:", e); - // } - // } - // for (Node n : unresponsiveMachines.keySet()) { - // Machine m = unresponsiveMachines.get(n); - // Properties props = initDbProperties(m); - // props.setProperty(RmPropName.Responsive.pname(), "false"); - // props.setProperty(RmPropName.Online.pname(), "true"); - // try { - // persistence.createMachine(m.getId(), props); - // } catch (Exception e) { - // logger.warn(methodName, null, "Cannot store (unresponsive) node", m.getId(), "in db:", e); - // } - // } - // for (Node n : offlineMachines.keySet()) { - // Machine m = offlineMachines.get(n); - // Properties props = initDbProperties(m); - // props.setProperty(RmPropName.Responsive.pname(), "unknown"); - // props.setProperty(RmPropName.Online.pname(), "false"); - // try { - // persistence.createMachine(m.getId(), props); - // } catch (Exception e) { - // logger.warn(methodName, null, "Cannot store (offline) node", m.getId(), "in db:", e); - // } - // } - - // } - - /** * Handle a new node update. */ @@ -1099,7 +1061,7 @@ class NodePool if ( offlineMachines.containsKey(node) ) { // if it's offline it can't be restored like this. Machine m = offlineMachines.get(node); - signalDb(m, RmProperty.Responsive, true); + signalDb(m, RmNodes.Responsive, true); logger.trace(methodName, null, "Node ", m.getId(), " is offline, not activating."); return m; } @@ -1111,7 +1073,6 @@ class NodePool m.setShareOrder(order); // hardware changes. } - // TODO soon ... can I just combine this with the code directly below: allMachines.put(node, m); machinesByName.put(m.getId(), m); machinesByIp.put(m.getIp(), m); @@ -1124,7 +1085,7 @@ class NodePool mlist.put(m.key(), m); total_shares += order; // UIMA-3939 - signalDb(m, RmProperty.Responsive, true); + signalDb(m, RmNodes.Responsive, true); logger.info(methodName, null, "Nodepool:", id, "Host reactivated ", m.getId(), String.format("shares %2d total %4d:", order, total_shares), m.toString()); return m; } @@ -1152,9 +1113,9 @@ class NodePool String.format("shares %2d total %4d:", order, total_shares), machine.toString()); updated++; - Map<RmProperty, Object> props = initDbProperties(allMachines.get(key)); - props.put(RmProperty.Responsive, true); - props.put(RmProperty.Online, true); + Map<RmNodes, Object> props = initDbProperties(allMachines.get(key)); + props.put(RmNodes.Responsive, true); + props.put(RmNodes.Online, true); try { persistence.createMachine(machine.getId(), props); } catch (Exception e) { @@ -1231,7 +1192,7 @@ class NodePool void nodeLeaves(Machine m) { disable(m, unresponsiveMachines); - signalDb(m, RmProperty.Responsive, false); + signalDb(m, RmNodes.Responsive, false); } // UIMA-4142 @@ -1272,7 +1233,7 @@ class NodePool Node key = mm.key(); iter.remove(); offlineMachines.put(key, mm); - signalDb(m, RmProperty.Online, false); + signalDb(m, RmNodes.Online, false); return "VaryOff: Nodepool " + id + " - Unresponsive machine, marked offline: " + node; } } @@ -1281,7 +1242,7 @@ class NodePool } disable(m, offlineMachines); - signalDb(m, RmProperty.Online, false); + signalDb(m, RmNodes.Online, false); return "VaryOff: " + node + " - OK."; } @@ -1300,7 +1261,7 @@ class NodePool Machine mm = iter.next(); if ( mm.getId().equals(node) ) { iter.remove(); - signalDb(mm, RmProperty.Online, true); + signalDb(mm, RmNodes.Online, true); return "VaryOn: Nodepool " + id + " - Machine marked online: " + node; } } @@ -1309,7 +1270,7 @@ class NodePool while ( iter.hasNext() ) { Machine mm = iter.next(); if ( mm.getId().equals(node) ) { - signalDb(mm, RmProperty.Online, true); + signalDb(mm, RmNodes.Online, true); return "VaryOn: Nodepool " + id + " - Machine is online but not responsive: " + node; } } Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-rm/src/main/java/org/apache/uima/ducc/rm/scheduler/Share.java Mon Nov 9 11:34:50 2015 @@ -18,6 +18,9 @@ */ package org.apache.uima.ducc.rm.scheduler; +import java.util.HashMap; +import java.util.Map; + import org.apache.uima.ducc.common.Node; import org.apache.uima.ducc.common.NodeIdentity; import org.apache.uima.ducc.common.utils.id.DuccId; @@ -61,6 +64,19 @@ public class Share @SuppressWarnings("unused") private String pid = "<none>"; + Map<String, Object> getShareProperties() + { + Map<String, Object> ret = new HashMap<String, Object>(); + ret.put("numeric_id", id.getFriendly()); + ret.put("uuid", id.getUnique()); + ret.put("share_order", share_order); + ret.put("init_time", init_time.getElapsedMillis()); + ret.put("evicted", evicted); + ret.put("purged", purged); + ret.put("fixed", fixed); + ret.put("investment", investment); + return ret; + } /** * This constructor is used during recovery ONLY. Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java?rev=1713395&r1=1713394&r2=1713395&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-transport/src/main/java/org/apache/uima/ducc/transport/event/common/history/IHistoryPersistenceManager.java Mon Nov 9 11:34:50 2015 @@ -108,6 +108,7 @@ public interface IHistoryPersistenceMana public boolean isPrimaryKey() { return false; } public boolean isPrivate() { return false; } public boolean isMeta() { return false; } + public boolean isIndex() { return false; } public String columnName() { return pname(); } }; @@ -140,7 +141,9 @@ public interface IHistoryPersistenceMana public boolean isPrimaryKey() { return false; } public boolean isPrivate() { return false; } public boolean isMeta() { return false; } + public boolean isIndex() { return false; } public String columnName() {return pname(); } + public Type listType() { return Type.String; } };