asmuts 02/01/14 22:18:12
Added: src/java/org/apache/stratum/jcs/auxiliary/disk/hsql
HSQLCache.java HSQLCacheAttributes.java
HSQLCacheFactory.java HSQLCacheManager.java
HSQLCacheNoWaitBuffer.java PurgatoryElement.java
Log:
test implementation of an hsql disk cache auxiliary
too slow in current form, future project
extend to other databases as a db auxiliary
Revision Changes Path
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/hsql/HSQLCache.java
Index: HSQLCache.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk.hsql;
import java.io.*;
import java.util.*;
import java.sql.*;
import org.apache.stratum.jcs.access.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.disk.hsql.*;
import org.apache.stratum.jcs.auxiliary.disk.hsql.behavior.*;
import org.apache.stratum.jcs.engine.control.*;
import org.apache.stratum.jcs.engine.control.Cache;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.utils.data.*;
import org.apache.stratum.jcs.utils.log.*;
import org.hsqldb.*;
////////////////////////////////////////////////////////////////////////
/**
* @author Aaron Smuts
* @version 1.0
*
*/
public class HSQLCache implements ICache, Serializable {
private static Logger log;
private static int numCreated = 0;
private int numInstances = 0;
// trivial debugging that should be removed by the compiler
private static final boolean debug = false;//true;
private static final boolean debugR = false;//true;
private static final boolean debugPut = false;//true;
private static final boolean debugGet = false;//true;
private String cacheName;
public boolean isAlive = false;
// not used
private String source_id = "org.apache.stratum.jcs.auxiliary.hsql.HSQLCache";
IHSQLCacheAttributes cattr;
// disk cache buffer, need to remove key from buffer on update, if its there
HSQLCacheNoWaitBuffer buffer;
// for now use one statement per cache and keep it open
// can move up to manager level or implement pooling if there are too many
// caches
Connection cConn;
Statement sStatement;
//PreparedStatement psInsert;
//PreparedStatement psUpdate;
//PreparedStatement psSelect;
///////////////////////////////////////////////////
public Serializable getSourceId() {
return this.source_id;
}
////////////////////////////////////////////////////////////////////
// should use this method
public HSQLCache ( HSQLCacheNoWaitBuffer buffer, IHSQLCacheAttributes cattr ) {
this( cattr.getCacheName(), cattr.getDiskPath() );
this.cattr = cattr;
this.buffer = buffer;
}
protected HSQLCache ( String cacheName ) {
this( cacheName, null);
}
//////////////////////////////////////////////////////////////////////
protected HSQLCache ( String cacheName, String rafroot ) {
numInstances++;
log = LoggerManager.getLogger( this );
this.cacheName = cacheName;
//String rafroot = cattr.getDiskPath();
if ( rafroot == null ) {
try {
PropertyGroups pg = new PropertyGroups( "/cache.properties" );
rafroot = pg.getProperty( "diskPath" );
} catch( Exception e ) {
log.error( e );
}
}
try {
Properties p = new Properties();
String driver = p.getProperty("driver", "org.hsqldb.jdbcDriver");
String url = p.getProperty("url", "jdbc:hsqldb:");
String database = p.getProperty("database", "cache_hsql_db");
String user = p.getProperty("user", "sa");
String password = p.getProperty("password", "");
boolean test = p.getProperty("test", "true").equalsIgnoreCase("true");
boolean log = p.getProperty("log", "true").equalsIgnoreCase("true");
try {
if (log) {
trace("driver =" + driver);
trace("url =" + url);
trace("database=" + database);
trace("user =" + user);
trace("password=" + password);
trace("test =" + test);
trace("log =" + log);
//DriverManager.setLogStream(System.out);
}
// As described in the JDBC FAQ:
// http://java.sun.com/products/jdbc/jdbc-frequent.html;
// Why doesn't calling class.forName() load my JDBC driver?
// There is a bug in the JDK 1.1.x that can cause Class.forName() to fail.
new org.hsqldb.jdbcDriver();
Class.forName(driver).newInstance();
cConn = DriverManager.getConnection(url + database, user,
password);
try {
sStatement = cConn.createStatement();
isAlive = true;
} catch (SQLException e) {
System.out.println("Exception: " + e);
isAlive = false;
}
setupTABLE();
} catch (Exception e) {
p("QueryTool.init: " + e.getMessage());
e.printStackTrace();
}
} catch (Exception e) {
log.logEx(e);
}
} // end constructor
/**
* SETUP TABLE FOR CACHE
*
*/
void setupTABLE() {
boolean newT = true;
String setup = "create table " + cacheName + " (KEY varchar(255) primary key,
ELEMENT binary)";
try {
sStatement.executeQuery(setup);
} catch (SQLException e) {
if ( e.toString().indexOf( "already exists" ) != -1 ) {
newT = false;
}
System.out.println("Exception: " + e);
}
String setupData[] = {
"create index iKEY on " + cacheName + " (KEY)"
};
if ( newT ) {
for (int i = 1; i < setupData.length; i++) {
try {
sStatement.executeQuery(setupData[i]);
} catch (SQLException e) {
System.out.println("Exception: " + e);
}
}
} // end ifnew
}
////////////////////////////////////////////////////////
public void add (Serializable key, Serializable value) throws IOException {
put(key, value);
}
// ignore the multicast field.
public void put (Serializable key, Serializable value, boolean multicast) throws
IOException {
put(key, value);
}
public void put (Serializable key, Serializable value) throws IOException {
put( key, value, null );
}
public void put (Serializable key, Serializable value, Attributes attr) throws
IOException {
CacheElement ce = null;
ce = new CacheElement( cacheName, key, value );
ce.setAttributes( attr );
update( ce );
}
///////////////////////////////////////////////////////////
public void update( ICacheElement ce ) throws IOException {
if ( debug ) {
p( "update" );
}
if (!isAlive) {
return;
}
if ( ce instanceof PurgatoryElement ) {
PurgatoryElement pe = (PurgatoryElement)ce;
ce = pe.ice;
if ( !pe.isSpoolable ) {
if ( debug ) {
p( "pe is not spoolable" );
}
// it has been plucked from purgatory
return;
}
}
// /*
// if ( ce instanceof IDiskElement ) {
// IDiskElement ide = (IDiskElement)ce;
// if ( !ide.getIsSpoolable() ) {
// // it has been plucked from purgatory
// return;
// }
// }
// */
// remove item from purgatory since we are putting it on disk
// assume that the disk cache will never break
// disk breakage is akin to an out of memory exception
buffer.purgatory.remove( ce.getKey() );
if ( debug ) {
p( "putting " + ce.getKey() + " on disk, removing from purgatory" );
}
if ( debug ) {
p( "putting " + ce );
}
// remove single item.
byte[] element = serialize( ce );
//String sql = "insert into " + cacheName + " (KEY, ELEMENT) values ('" +
ce.getKey() + "', '" + element + "' )";
boolean exists = false;
try {
//String sqlS = "select ELEMENT from " + cacheName + " where KEY = ?";
//PreparedStatement psSelect = cConn.prepareStatement(sqlS);
//psSelect.setString(1,(String)ce.getKey());
//ResultSet rs = psSelect.executeQuery();
String sqlS = "select ELEMENT from " + cacheName + " where KEY = '" +
(String)ce.getKey() + "'" ;
ResultSet rs = sStatement.executeQuery(sqlS);
if ( rs.next() ) {
exists = true;
}
rs.close();
//psSelect.close();
} catch (SQLException e) {
log.error(e);
}
System.out.println( "exists = " + exists );
if ( !exists ) {
try {
String sqlI = "insert into " + cacheName + " (KEY, ELEMENT) values (?, ? )";
PreparedStatement psInsert = cConn.prepareStatement(sqlI);
psInsert.setString(1,(String)ce.getKey());
psInsert.setBytes(2,element);
psInsert.execute();
psInsert.close();
//sStatement.executeUpdate(sql);
} catch (SQLException e) {
if ( e.toString().indexOf( "Violation of unique index" ) != -1 ||
e.getMessage().indexOf( "Violation of unique index" ) != -1 ) {
exists = true;
} else {
log.error("Exception: " + e);
}
}
} else {
//sql = "update " + cacheName + " set ELEMENT = '" + element + "' where KEY
= '" + ce.getKey() + "'";
try {
String sqlU = "update " + cacheName + " set ELEMENT = ? ";
PreparedStatement psUpdate = cConn.prepareStatement(sqlU);
psUpdate.setBytes(1,element);
psUpdate.setString(2,(String)ce.getKey());
psUpdate.execute();
psUpdate.close();
//sStatement.executeUpdate(sql);
System.out.println("ran update" );
} catch (SQLException e2) {
log.error("e2 Exception: " + e2);
}
}
//
// DiskElementDescriptor ded = null;
// try {
//
// ded = new DiskElementDescriptor();
// byte[] data = Disk.serialize( ce );
// ded.init( dataFile.length(), data );
//
// // make sure this only locks for one particular cache region
// locker.writeLock();
//
// try {
// if (!isAlive) {
// return;
// }
// // Presume it's an append.
// //DiskElement re = new DiskElement( cacheName, key, value );
// //re.init( dataFile.length(), data );
//
// DiskElementDescriptor old =
(DiskElementDescriptor)keyHash.put(ce.getKey(), ded );
//
// // Item with the same key already exists in file.
// // Try to reuse the location if possible.
// if (old != null && ded.len <= old.len) {
// ded.pos = old.pos;
// }
// dataFile.write(data, ded.pos);
// /*
// // Only need to write if the item with the same key and value
// // does not already exist in the file.
// if (re.equals(old)) {
// re.pos = old.pos;
// }
// else {
// // Item with the same key but different value already exists in file.
// // Try to reuse the location if possible.
// if (old != null && re.len <= old.len) {
// re.pos = old.pos;
// }
// dataFile.write(data, re.pos);
// }
// */
//
// } finally {
// locker.done(); // release write lock.
// }
// if ( log.logLevel >= log.DEBUG ) {
// log.debug(fileName + " -- put " + ce.getKey() + " on disk at pos " +
ded.pos + " size = " + ded.len );
// }
// } catch( ConcurrentModificationException cme ) {
// // do nothing, this means it has gone back to memory mid serialization
// } catch (Exception e) {
// log.logEx(e, "cacheName = " + cacheName + ", ce.getKey() = " + ce.getKey());
// //reset();
// }
return;
}
///////////////////////////////////////////////////////////////
public Serializable get (Serializable key) {
return get(key, true, true);
}
///////////////////////////////////////////////////////////////
public Serializable get (Serializable key, boolean container ) {
return get(key, true, true);
}
//////////////////////////////////////////////////////////////////////
private Serializable get (Serializable key, boolean container, final boolean lock)
{
if ( debugGet ) {
p( "getting " + key + " from disk" );
}
if (!isAlive) {
return null;
}
Serializable obj = null;
byte[] data = null;
try {
String sqlS = "select ELEMENT from " + cacheName + " where KEY = ?";
PreparedStatement psSelect = cConn.prepareStatement(sqlS);
psSelect.setString(1,(String)key);
ResultSet rs = psSelect.executeQuery();
if ( rs.next() ) {
data = rs.getBytes(1);
}
if ( data != null ) {
try {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
BufferedInputStream bis = new BufferedInputStream( bais );
ObjectInputStream ois = new ObjectInputStream(bis);
try {
obj = (Serializable)ois.readObject();
} finally {
ois.close();
}
// move to finally
rs.close();
psSelect.close();
} catch (IOException ioe ) {
log.error( ioe );
} catch (Exception e ) {
log.error( e );
}
} //else {
//return null;
//}
} catch (SQLException sqle ) {
log.error( sqle );
}
// if (lock) {
// locker.readLock();
// }
// try {
// if (!isAlive) {
// return null;
// }
// DiskElementDescriptor ded = (DiskElementDescriptor)keyHash.get(key);
// if (ded != null) {
// if ( debugGet ) {
// p( "found " + key + " on disk" );
// }
// obj = dataFile.readObject(ded.pos);
// }
// //System.out.println( "got element = " + (CacheElement)obj);
// } catch (NullPointerException e) {
// log.logEx(e, "cacheName = " + cacheName + ", key = " + key);
// } catch (Exception e) {
// log.logEx(e, "cacheName = " + cacheName + ", key = " + key);
// } finally {
// if (lock) {
// locker.done();
// }
// }
return obj;
}
/**
* Returns true if the removal was succesful; or false if there is nothing to
remove.
* Current implementation always result in a disk orphan.
*/
public boolean remove (Serializable key) {
// remove single item.
String sql = "delete from " + cacheName + " where KEY = '" + key + "'";
try {
if (key instanceof String &&
key.toString().endsWith(NAME_COMPONENT_DELIMITER)) {
// remove all keys of the same name group.
sql = "delete from " + cacheName + " where KEY = like '" + key + "%'";
}
try {
sStatement.executeQuery(sql);
} catch (SQLException e) {
System.out.println("Exception: " + e);
}
} catch (Exception e) {
log.logEx(e);
reset();
}
return false;
}
/////////////////////////////////////////////
public void removeAll () {
try {
//reset();
} catch (Exception e) {
log.logEx(e);
//reset();
} finally {}
} // end removeAll
////////////////////////////////////////////////////////////////
// handle error by last resort, force content update, or removeall
public void reset () {
// log.logIt("Reseting cache");
// locker.writeLock();
// try {
// try {
// dataFile.close();
// File file = new File(rafDir, fileName + ".data");
// file.delete();
// keyFile.close();
// File file2 = new File(rafDir, fileName + ".key");
// file2.delete();
// } catch (Exception e) {
// log.logEx(e);
// }
// try {
// dataFile = new Disk(new File(rafDir, fileName + ".data"));
// keyFile = new Disk(new File(rafDir, fileName + ".key"));
// keyHash = new HashMap();
// } catch (IOException e) {
// log.logEx(e, " -- IN RESET");
// } catch (Exception e) {
// log.logEx(e, " -- IN RESET");
// }
// } finally {
// locker.done(); // release write lock.
// }
} // end reset
////////////////////////////////////////////////////////////////////////////
public String getStats () {
return "numInstances = " + numInstances;
}
/////////////////////////////////////////////////////////////
// shold be called by cachemanager, since it knows how
// many are checked out
public void dispose () {
// if (!isAlive) {
// log.logIt("is not alive and close() was called -- " + fileName);
// return;
// }
// locker.writeLock();
// try {
// if (!isAlive) {
// log.logIt("is not alive and close() was called -- " + fileName);
// return;
// }
// try {
// optimizeFile();
// } catch (Exception e) {
// log.logEx(e, "-- " + fileName);
// }
// try {
// numInstances--;
// if (numInstances == 0) {
// p( "dispose -- Closing files -- in close -- " + fileName );
// log.warn("dispose -- Closing files -- in close -- " + fileName);
// dataFile.close();
// dataFile = null;
// keyFile.close();
// keyFile = null;
// }
// } catch (Exception e) {
// log.logEx(e, "-- " + fileName);
// }
// } finally {
// isAlive = false;
// locker.done(); // release write lock;
// }
} // end dispose
/** Returns the cache status. */
public int getStatus () {
return isAlive ? STATUS_ALIVE : STATUS_DISPOSED;
}
/** Returns the current cache size. */
public int getSize () {
return 0; // need to get count
}
public int getCacheType () {
return DISK_CACHE;
}
/** For debugging. */
public void dump () {
// log.debug("keyHash.size()=" + keyHash.size());
// for (Iterator itr = keyHash.entrySet().iterator(); itr.hasNext();) {
// Map.Entry e = (Map.Entry)itr.next();
// Serializable key = (Serializable)e.getKey();
// DiskElementDescriptor ded = (DiskElementDescriptor)e.getValue();
// Serializable val = get(key);
// log.debug("disk dump> key=" + key + ", val=" + val + ", pos=" + ded.pos);
// }
}
//////////////////////////////////////////
void trace(String s) {
p(s);
}
//////////////
public void p( String s ) {
log.debug(s);
//System.out.println( "HSQLCache: " + s );
}
/** Returns cache name, ha */
public String getCacheName () {
return cacheName;
}
/////////////////////////////////////////////////////////////////////////
/** Returns the serialized form of the given object in a byte array. */
static byte[] serialize (Serializable obj) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
try {
oos.writeObject(obj);
} finally {
oos.close();
}
return baos.toByteArray();
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/hsql/HSQLCacheAttributes.java
Index: HSQLCacheAttributes.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk.hsql;
import org.apache.stratum.jcs.auxiliary.disk.hsql.behavior.IHSQLCacheAttributes;
import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheAttributes;
//////////////////////////////////////////////////
public class HSQLCacheAttributes implements IHSQLCacheAttributes {
private String cacheName;
private String name;
private String diskPath;
/////////////////////////////////////////
public HSQLCacheAttributes() {
}
////////////////////////////////////
public void setDiskPath( String path ) {
this.diskPath = path;
}
////////////////////////////////////
public String getDiskPath( ) {
return this.diskPath;
}
////////////////////////////////////////////////////
public void setCacheName( String s ) {
this.cacheName = s;
}
public String getCacheName( ) {
return this.cacheName;
}
/////////////////////////////////////////////////////////////////////
public String getName() {
return this.name;
}
public void setName( String name ) {
this.name = name;
}
/////////////////////////////////////////////////
public IAuxiliaryCacheAttributes copy() {
try {
return (IAuxiliaryCacheAttributes)this.clone();
} catch( Exception e ){}
return (IAuxiliaryCacheAttributes)this;
}
///////////////////////////////////////////////////////////////
public String toString() {
StringBuffer str = new StringBuffer();
str.append( "diskPath = " + diskPath );
return str.toString();
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/hsql/HSQLCacheFactory.java
Index: HSQLCacheFactory.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk.hsql;
import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheFactory;
import org.apache.stratum.jcs.engine.behavior.ICache;
import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheAttributes;
/**
* @author Aaron Smuts
* @version 1.0
*/
import org.apache.stratum.jcs.auxiliary.behavior.*;
import org.apache.stratum.jcs.auxiliary.disk.hsql.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
///////////////////////////////////////////////////////////////
public class HSQLCacheFactory implements IAuxiliaryCacheFactory {
private static Logger log = LoggerManager.getLogger( HSQLCacheFactory.class );
private static String name;
///////////////////////////////////////////////
public HSQLCacheFactory() {
}
////////////////////////////////////////////////////////////////////
public ICache createCache(IAuxiliaryCacheAttributes iaca) {
IHSQLCacheAttributes idca = (IHSQLCacheAttributes)iaca;
HSQLCacheManager dcm = HSQLCacheManager.getInstance( idca );
return dcm.getCache( idca );
}
/////////////////////////////////////////////////////////////////////
public String getName() {
return this.name;
}
public void setName( String name ) {
this.name = name;
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/hsql/HSQLCacheManager.java
Index: HSQLCacheManager.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk.hsql;
import org.apache.stratum.jcs.auxiliary.disk.*;
import java.io.*;
import java.util.*;
import java.sql.*;
import org.apache.stratum.jcs.auxiliary.disk.hsql.*;
import org.apache.stratum.jcs.auxiliary.disk.hsql.behavior.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
/////////////////////////////////////////////
public class HSQLCacheManager implements ICacheManager {
private static int clients;
private static Hashtable caches = new Hashtable();
private static Logger log;
private static final boolean debug = false; //true;
private static final boolean pOut = false; //true;
private static HSQLCacheManager instance;
private static IHSQLCacheAttributes defaultCattr;
///////////////////////////////////////////////////////////////////
private HSQLCacheManager( IHSQLCacheAttributes cattr ) {
log = LoggerManager.getLogger( this );
this.defaultCattr = cattr;
}
////////////////////////////////////////
public IHSQLCacheAttributes getDefaultCattr() {
return this.defaultCattr;
}
///////////////////////////////////////////////////////////////////
public static HSQLCacheManager getInstance( IHSQLCacheAttributes cattr ) {
if (instance == null) {
synchronized(HSQLCacheManager.class) {
if (instance == null) {
instance = new HSQLCacheManager( cattr );
}
}
}
if ( debug ) {
log.logIt( "Manager stats : " + instance.getStats() + "<br> -- in
getInstance()" );
}
clients++;
return instance;
}
/////////////////////////////////////////////////////////
public ICache getCache( String cacheName ) {
IHSQLCacheAttributes cattr = (IHSQLCacheAttributes)defaultCattr.copy();
cattr.setCacheName( cacheName );
return getCache( cattr );
}
//////////////////////////////////////////////////////////
public ICache getCache( IHSQLCacheAttributes cattr ) {
ICache raf=null;
p( "cacheName = " + cattr.getCacheName() );
synchronized(caches) {
raf = (ICache)caches.get( cattr.getCacheName() );
if (raf == null) {
// make use cattr
//raf = new HSQLCache( cattr.getCacheName(), cattr.getDiskPath() );
raf = new HSQLCacheNoWaitBuffer( cattr );
caches.put( cattr.getCacheName(), raf );
}
}
if ( debug ) {
log.logIt( "Manager stats : " + instance.getStats() );
}
return raf;
}
////////////////////////////////////////////////////////////////
public void freeCache( String name ) {
HSQLCache raf = (HSQLCache)caches.get(name);
if (raf != null) {
raf.dispose();
}
}
// Don't care if there is a concurrency failure ?
public String getStats(){
StringBuffer stats = new StringBuffer();
Enumeration allCaches = caches.elements();
while (allCaches.hasMoreElements()) {
HSQLCache raf = (HSQLCache)allCaches.nextElement();
if (raf != null) {
stats.append( "<br> " + raf.getStats() );
}
}
return stats.toString();
}
///////////////////////////////////////
public int getCacheType() {
return DISK_CACHE;
}
/////////////////////////////////////////////////////////////////
public void release() {
// Wait until called by the last client
if (--clients != 0) {
return;
}
synchronized(caches) {
Enumeration allCaches = caches.elements();
while (allCaches.hasMoreElements()) {
HSQLCache raf = (HSQLCache)allCaches.nextElement();
if (raf != null) {
raf.dispose();
}
}
}
} //end release()
/////////////////////////////////////////////
public void p( String s ) {
if ( log.logLevel >= log.DEBUG ) {
log.debug( s );
}
if ( pOut ) {
System.out.println( "HSQLCacheManager: " + s );
}
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/hsql/HSQLCacheNoWaitBuffer.java
Index: HSQLCacheNoWaitBuffer.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk.hsql;
import java.io.*;
import java.rmi.*;
import java.util.*;
import org.apache.stratum.jcs.engine.control.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.disk.hsql.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
/**
* Store recent arivals in a temporary queue.
* Used to queue up update requests to the underlying cache.
* These requests will be processed in their order of arrival
* via the cache event queue processor.
*
*
* Note: There is a a problem lurking in all queued distributed
* systems, where repopulzation of memory can occur dispite a removal
* if the tow arrive in different order than issued. The possible solution
* to never delete, but only invlidate. The cache can act ina cvs like mode,
* enforcing versions of element, checking to see if a newer version exists
* adding. So a remote put could amke sure a new version hasn't arrived.
* Invalidaions would have version. We could avoid spooling these since it will
* have been a while. This will be called garuanteed mode and can be added
* to any listener.
*
*
*/
public class HSQLCacheNoWaitBuffer implements ICache {
private static final boolean debug = false;//true;
// accessed by disk cache
protected Hashtable purgatory = new Hashtable();
private static final boolean debugPH = true;
private int purgHits = 0;
private IHSQLCacheAttributes cattr;
private HSQLCache cache;
private ICacheEventQueue q;
private transient Logger log = LoggerManager.getInstance().getLogger(this);
private String source_id =
"org.apache.stratum.jcs.auxiliary.disk.HSQLCacheNoWaitBuffer";
///////////////////////////////////////////////////
public Serializable getSourceId() {
return this.source_id;
}
/**
* Constructs with the given disk cache,
* and fires up an event queue for aysnchronous processing.
*/
public HSQLCacheNoWaitBuffer(IHSQLCacheAttributes cattr) {
cache = new HSQLCache( this, cattr );
this.cattr = cattr;
this.q = new CacheEventQueue(new CacheAdaptor(cache), CacheInfo.listenerId,
cache.getCacheName());
// need each no wait to handle each of its real updates and removes, since
there may
// be more than one per cache? alternativve is to have the cache
// perform updates using a different method that spcifies the listener
//this.q = new CacheEventQueue(new CacheAdaptor(this),
HSQLCacheInfo.listenerId, cache.getCacheName());
if (cache.getStatus() == cache.STATUS_ERROR) {
log.error( "destroying queue" );
q.destroy();
}
}
/** Adds a put request to the disk cache. */
public void put(Serializable key, Serializable value ) throws IOException {
put( key, value, null );
}
public void put(Serializable key, Serializable value, Attributes attr ) throws
IOException {
try {
CacheElement ce = new CacheElement( cache.getCacheName(), key, value );
ce.setAttributes( attr );
update( ce );
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
}
public void update( ICacheElement ce ) throws IOException {
try {
if ( debug ) {
p( "putting in purgatory" );
}
PurgatoryElement pe = new PurgatoryElement( ce );
pe.isSpoolable = true;
q.addPutEvent( (ICacheElement)pe );
//q.addPutEvent( ce );
/*
// may be too slow
IDiskElement ide = new DiskElement( ce );
ide.setAttributes( ce.getAttributes() );
purgatory.put( ide.getKey(), ide );
//CacheElement ice = new CacheElement(ce.getCacheName(), ce.getKey(),
ce.getVal() );
//ice.setAttributes( ce.getAttributes() );
ide.setIsSpoolable( true );
q.addPutEvent( ide );
*/
} catch(IOException ex) {
log.error(ex);
// should we destroy purgatory. it will swell
q.destroy();
}
}
/** Synchronously reads from the disk cache. */
public Serializable get(Serializable key) {
return get( key, true );
}
public Serializable get(Serializable key, boolean container ) {
//IDiskElement ide = (IDiskElement)purgatory.get( key );
PurgatoryElement pe = (PurgatoryElement)purgatory.get(key);
//if ( ide != null ) {
if ( pe != null ) {
purgHits++;
if( debugPH ) {
if ( purgHits % 100 == 0 ) {
p( "purgatory hits = " + purgHits );
}
}
//ide.setIsSpoolable( false );
pe.isSpoolable = false;
if ( debug ) {
p( "found in purgatory" );
}
if ( container ) {
purgatory.remove( key );
//return (ICacheElement)ide;
return pe.ice;
} else {
purgatory.remove( key );
//return (Serializable)ide.getVal();
return (Serializable)pe.ice.getVal();
}
}
try {
return cache.get(key);
} catch(Exception ex) {
q.destroy();
// not sure we should do this. What about purgatory?
// must assume that we will not loose disk access
// can make a repairer, but it complicates purgatory.
}
return null;
}
/** Adds a remove request to the disk cache. */
public boolean remove(Serializable key) {
purgatory.remove( key );
try {
q.addRemoveEvent(key);
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
return false;
}
/** Adds a removeAll request to the disk cache. */
public void removeAll() {
Hashtable temp = purgatory;
purgatory = new Hashtable();
temp = null;
try {
q.addRemoveAllEvent();
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
}
/** Adds a dispose request to the disk cache. */
public void dispose() {
cache.dispose();
// may loose the end of the queue, need to be more graceful
q.destroy();
/*
try {
q.addDisposeEvent();
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
*/
}
/** No disk invokation. */
public String getStats() {
return cache.getStats();
}
/** No disk invokation. */
public int getSize() {
return cache.getSize();
}
/** No disk invokation. */
public int getCacheType() {
return cache.getCacheType();
}
/**
* Returns the asyn cache status.
* An error status indicates either the disk connection is not available,
* or the asyn queue has been unexpectedly destroyed.
* No disk invokation.
*/
public int getStatus() {
return q.isAlive() ? cache.getStatus() : cache.STATUS_ERROR;
}
public String getCacheName() {
return cache.getCacheName();
}
/** NOT USED NOW
* Replaces the disk cache service handle with the given handle and
* reset the event queue by starting up a new instance.
*/
public void fixCache(IHSQLCacheService disk) {
//cache.fixCache(disk);
resetEventQ();
return;
}
/** Resets the event q by first destroying the existing one and starting up new
one. */
public void resetEventQ() {
if (q.isAlive()) {
q.destroy();
}
this.q = new CacheEventQueue(new CacheAdaptor(cache), CacheInfo.listenerId,
cache.getCacheName());
}
public String toString() {
return "HSQLCacheNoWaitBuffer: " + cache.toString();
}
private void p(String s) {
System.out.println("HSQLCacheNoWaitBuffer:" + s);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/hsql/PurgatoryElement.java
Index: PurgatoryElement.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk.hsql;
import java.io.Serializable;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.control.*;
///////////////////////////////////////////////////////////////
public class PurgatoryElement implements ICacheElement, Serializable {
// need speed here. the method calls are unnecessary. make protected
protected boolean isSpoolable = false;
protected ICacheElement ice;
public PurgatoryElement( ICacheElement ice ) {
this.ice = ice;
}
// lets the queue know that is ready to be spooled
public boolean getIsSpoolable() {
return isSpoolable;
}
public void setIsSpoolable( boolean isSpoolable ) {
this.isSpoolable = isSpoolable;
}
// ICacheElement Methods
public String getCacheName() {
return ice.getCacheName();
}
public Serializable getKey() {
return ice.getKey();
}
public Serializable getVal() {
return ice.getVal();
}
public Attributes getAttributes() {
return ice.getAttributes();
}
public void setAttributes( IAttributes attr ) {
ice.setAttributes( attr );
}
public long getCreateTime() {
return ice.getCreateTime();
}
}
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>