asmuts 02/01/14 22:20:03
Added: src/java/org/apache/stratum/jcs/auxiliary/disk Disk.java
DiskCache.java DiskCacheAttributes.java
DiskCacheFactory.java DiskCacheManager.java
DiskCacheNoWaitBuffer.java DiskDumper.java
DiskElement.java DiskElementDescriptor.java
DiskLockManager.java PurgatoryElement.java
Log:
Default disk cache implementation
Memory key storage
needs real time defragmentation and a disk key swap
store keys and defragments on sutdown
quick
purgatory for incoming items
Revision Changes Path
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/Disk.java
Index: Disk.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk;
import java.io.*;
import java.util.*;
import org.apache.stratum.jcs.auxiliary.disk.*;
import org.apache.stratum.jcs.access.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.control.Cache;
import org.apache.stratum.jcs.utils.log.*;
/**
* Provides thread safe access to the underlying random access file.
*/
class Disk {
private final Logger log;
private final String filepath;
private RandomAccessFile raf;
private static boolean debug = false;
//////////////////////////////////////////////////////
Disk (File file) throws FileNotFoundException {
log = LoggerManager.getLogger(this);
this.filepath = file.getAbsolutePath();
raf = new RandomAccessFile(filepath, "rw");
}
//////////////////////////////////////////////
Serializable readObject (long pos) {
byte[] data = null;
boolean corrupted = false;
try {
synchronized (this) {
raf.seek(pos);
int datalen = raf.readInt();
if (datalen > raf.length()) {
corrupted = true;
}
else {
raf.readFully(data = new byte[datalen]);
}
}
if (corrupted) {
log.logIt("The datFile is corrupted");
//reset();
return null;
}
ByteArrayInputStream bais = new ByteArrayInputStream(data);
BufferedInputStream bis = new BufferedInputStream( bais );
ObjectInputStream ois = new ObjectInputStream(bis);
try {
return (Serializable)ois.readObject();
} finally {
ois.close();
}
} catch (Exception e) {
log.logEx(e, "-- " + raf);
}
return null;
}
/** Appends byte array to the Disk. */
boolean append (byte[] data) {
try {
synchronized (this) {
return write(data, raf.length());
}
} catch (IOException ex) {
ex.printStackTrace();
}
return false;
}
/** Writes the given byte array to the Disk at the specified position. */
boolean write (byte[] data, long pos) {
if (debug) {
log.debug("write> pos=" + pos);
log.debug(raf + " -- data.length = " + data.length);
}
try {
synchronized (this) {
raf.seek(pos);
raf.writeInt(data.length);
raf.write(data);
}
return true;
} catch (IOException ex) {
ex.printStackTrace();
}
return false;
}
////////////////////////////////////////////////////////
boolean writeObject (Serializable obj, long pos) {
try {
return write(serialize(obj), pos);
} catch (IOException ex) {
ex.printStackTrace();
}
return false;
}
//////////////////////////////////////////////////////////
DiskElementDescriptor appendObject ( CacheElement obj) {
long pos = -1;
boolean success = false;
try {
DiskElementDescriptor ded = new DiskElementDescriptor();
byte[] data = serialize(obj);
synchronized (this) {
pos = raf.length();
ded.init( pos, data );
success = write(data, pos);
}
//return success ? new DiskElement(pos, data) : null;
return success ? ded : null;
} catch (IOException ex) {
ex.printStackTrace();
}
return null;
}
/////////////////////////////////////////////////////////
/** Returns the raf length. */
long length () throws IOException {
synchronized (this) {
return raf.length();
}
}
//////////////////////////////////////////////////////////
/** Closes the raf. */
synchronized void close () throws IOException {
raf.close();
return;
}
//////////////////////////////////////////////////////////
/** Sets the raf to empty. */
synchronized void reset () throws IOException {
raf.close();
File f = new File(filepath);
int i = 0;
for (; i < 10 && !f.delete(); i++) {
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException ex) {}
log.warn("Failed to delete " + f.getName() + " " + i);
}
if (i == 10) {
IllegalStateException ex = new IllegalStateException("Failed to delete " +
f.getName());
log.error(ex);
throw ex;
}
raf = new RandomAccessFile(filepath, "rw");
return;
}
/////////////////////////////////////////////////////////////////////////
/** Returns the serialized form of the given object in a byte array. */
static byte[] serialize (Serializable obj) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
try {
oos.writeObject(obj);
} finally {
oos.close();
}
return baos.toByteArray();
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskCache.java
Index: DiskCache.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk;
import java.io.*;
import java.util.*;
import java.sql.*;
import org.apache.stratum.jcs.access.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.disk.*;
import org.apache.stratum.jcs.auxiliary.disk.behavior.*;
import org.apache.stratum.jcs.engine.control.*;
import org.apache.stratum.jcs.engine.control.Cache;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.utils.data.*;
import org.apache.stratum.jcs.utils.log.*;
////////////////////////////////////////////////////////////////////////
/**
* @author Aaron Smuts
* @version 1.0
*
* @author Hanson Char
* @version 1.1
* <br><br>Changes in version 1.1:
* <ol>
* <li>replace synchronization mechanism with read/write lock manager;
* <li>replace the use of DiskIO with Disk;
* <li>avoid disk write if no change to data value;
* <li>reduce disk fragmentation by re-using disk locations when possible;
* </ol>
*
* @author Aaron Smuts
* @version 1.2
* Made the disk cache attribute driven.
* Uses common ICache interface and ICacheAttributes.
*
*/
public class DiskCache implements ICache, Serializable {
Logger log;
private static int numCreated = 0;
private int numInstances = 0;
// trivial debugging that should be removed by the compiler
private static final boolean debug = false;//true;
private static final boolean debugR = false;//true;
private static final boolean debugPut = false;//true;
private static final boolean debugGet = false;//true;
private final static DiskLockManager locker = DiskLockManager.getInstance();
private String fileName;
private String cacheName;
private Disk dataFile;
private Disk keyFile;
private HashMap keyHash; // not synchronized to maximize concurrency.
public boolean isAlive = false;
private File rafDir;
IDiskCacheAttributes cattr;
// not used
private String source_id = "org.apache.stratum.jcs.auxiliary.disk.DiskCache";
// disk cache buffer, need to remove key from buffer on update, if its there
DiskCacheNoWaitBuffer buffer;
///////////////////////////////////////////////////
public Serializable getSourceId() {
return this.source_id;
}
////////////////////////////////////////////////////////////////////
// should use this method
public DiskCache ( DiskCacheNoWaitBuffer buffer, IDiskCacheAttributes cattr ) {
this( cattr.getCacheName(), cattr.getDiskPath() );
this.cattr = cattr;
this.buffer = buffer;
}
protected DiskCache ( String cacheName ) {
this( cacheName, null);
}
//////////////////////////////////////////////////////////////////////
protected DiskCache ( String cacheName, String rafroot ) {
numInstances++;
this.fileName = cacheName;
this.cacheName = cacheName;
//String rafroot = cattr.getDiskPath();
if ( rafroot == null ) {
try {
PropertyGroups pg = new PropertyGroups( "/cache.properties" );
rafroot = pg.getProperty( "diskPath" );
} catch( Exception e ) {
log.error( e );
}
}
rafDir = new File(rafroot);
rafDir.mkdirs();
log = LoggerManager.getLogger(this);
log.info("rafroot=" + rafroot);
try {
try {
dataFile = new Disk(new File(rafDir, fileName + ".data"));
keyFile = new Disk(new File(rafDir, fileName + ".key"));
if (keyFile.length() > 0) {
loadKeys();
if (keyHash.size() == 0) {
dataFile.reset();
}
}
else {
keyHash = new HashMap();
if (dataFile.length() > 0) {
dataFile.reset();
}
}
isAlive = true;
} catch (FileNotFoundException e) {
log.logEx(e);
} catch (Exception e) {
log.logEx(e, fileName);
}
} catch (Exception e) {
log.logEx(e);
}
} // end constructor
////////////////////////////////////////////////////////////////////
public void loadKeys () {
locker.writeLock();
try {
keyHash = (HashMap)keyFile.readObject(0);
if (keyHash == null) {
keyHash = new HashMap();
}
if (debug) {
log.logIt("LOADKEYS " + fileName + " -- keyHash.size() = " + keyHash.size());
}
} catch (Exception e) {
log.logEx(e, fileName);
} finally {
locker.done(); // release read lock.
}
} // end loadKeys
////////////////////////////////////////////////////////////////////
public void saveKeys () {
try {
if (debug) {
log.logIt("SAVEKEYS " + fileName + " -- keyHash.size() = " + keyHash.size());
}
locker.writeLock();
try {
keyFile.reset();
if (keyHash.size() > 0) {
keyFile.writeObject(keyHash, 0);
}
} finally {
locker.done(); // release write lock.
}
} catch (Exception e) {
log.logEx(e);
}
} // end saveKeys
////////////////////////////////////////////////////////
public void add (Serializable key, Serializable value) throws IOException {
put(key, value);
}
// ignore the multicast field.
public void put (Serializable key, Serializable value, boolean multicast) throws
IOException {
put(key, value);
}
public void put (Serializable key, Serializable value) throws IOException {
put( key, value, null );
}
public void put (Serializable key, Serializable value, Attributes attr) throws
IOException {
CacheElement ce = null;
ce = new CacheElement( cacheName, key, value );
ce.setAttributes( attr );
update( ce );
}
///////////////////////////////////////////////////////////
public void update( ICacheElement ce ) throws IOException {
if ( debug ) {
p( "update" );
}
if (!isAlive) {
return;
}
if ( ce instanceof PurgatoryElement ) {
PurgatoryElement pe = (PurgatoryElement)ce;
ce = pe.ice;
if ( !pe.isSpoolable ) {
if ( debug ) {
p( "pe is not spoolable" );
}
// it has been plucked from purgatory
return;
}
}
/*
if ( ce instanceof IDiskElement ) {
IDiskElement ide = (IDiskElement)ce;
if ( !ide.getIsSpoolable() ) {
// it has been plucked from purgatory
return;
}
}
*/
// remove item from purgatory since we are putting it on disk
// assume that the disk cache will never break
// disk breakage is akin to an out of memory exception
buffer.purgatory.remove( ce.getKey() );
if ( debug ) {
p( "putting " + ce.getKey() + " on disk, removing from purgatory" );
}
//System.out.println( "putting " + ce );
DiskElementDescriptor ded = null;
try {
ded = new DiskElementDescriptor();
byte[] data = Disk.serialize( ce );
ded.init( dataFile.length(), data );
// make sure this only locks for one particular cache region
locker.writeLock();
try {
if (!isAlive) {
return;
}
// Presume it's an append.
//DiskElement re = new DiskElement( cacheName, key, value );
//re.init( dataFile.length(), data );
DiskElementDescriptor old = (DiskElementDescriptor)keyHash.put(ce.getKey(),
ded );
// Item with the same key already exists in file.
// Try to reuse the location if possible.
if (old != null && ded.len <= old.len) {
ded.pos = old.pos;
}
dataFile.write(data, ded.pos);
/*
// Only need to write if the item with the same key and value
// does not already exist in the file.
if (re.equals(old)) {
re.pos = old.pos;
}
else {
// Item with the same key but different value already exists in file.
// Try to reuse the location if possible.
if (old != null && re.len <= old.len) {
re.pos = old.pos;
}
dataFile.write(data, re.pos);
}
*/
} finally {
locker.done(); // release write lock.
}
if ( log.logLevel >= log.DEBUG ) {
log.debug(fileName + " -- put " + ce.getKey() + " on disk at pos " + ded.pos
+ " size = " + ded.len );
}
} catch( ConcurrentModificationException cme ) {
// do nothing, this means it has gone back to memory mid serialization
} catch (Exception e) {
log.logEx(e, "cacheName = " + cacheName + ", ce.getKey() = " + ce.getKey());
//reset();
}
return;
}
///////////////////////////////////////////////////////////////
public Serializable get (Serializable key) {
return get(key, true, true);
}
///////////////////////////////////////////////////////////////
public Serializable get (Serializable key, boolean container ) {
return get(key, true, true);
}
//////////////////////////////////////////////////////////////////////
private Serializable get (Serializable key, boolean container, final boolean lock)
{
if ( debugGet ) {
p( "getting " + key + " from disk" );
}
if (!isAlive) {
return null;
}
Serializable obj = null;
if (lock) {
locker.readLock();
}
try {
if (!isAlive) {
return null;
}
DiskElementDescriptor ded = (DiskElementDescriptor)keyHash.get(key);
if (ded != null) {
if ( debugGet ) {
p( "found " + key + " on disk" );
}
obj = dataFile.readObject(ded.pos);
}
//System.out.println( "got element = " + (CacheElement)obj);
} catch (NullPointerException e) {
log.logEx(e, "cacheName = " + cacheName + ", key = " + key);
} catch (Exception e) {
log.logEx(e, "cacheName = " + cacheName + ", key = " + key);
} finally {
if (lock) {
locker.done();
}
}
return obj;
}
/**
* Returns true if the removal was succesful; or false if there is nothing to
remove.
* Current implementation always result in a disk orphan.
*/
public boolean remove (Serializable key) {
locker.writeLock();
try {
if (key instanceof String &&
key.toString().endsWith(NAME_COMPONENT_DELIMITER)) {
// remove all keys of the same name group.
boolean removed = false;
for (Iterator itr = keyHash.entrySet().iterator(); itr.hasNext();) {
Map.Entry entry = (Map.Entry)itr.next();
Object k = entry.getKey();
if (k instanceof String && k.toString().startsWith(key.toString())) {
itr.remove();
removed = true;
}
}
return removed;
}
// remove single item.
return keyHash.remove(key) != null;
} catch (Exception e) {
log.logEx(e);
reset();
} finally {
locker.done(); // release write lock.
}
return false;
}
/////////////////////////////////////////////
public void removeAll () {
try {
reset();
} catch (Exception e) {
log.logEx(e);
reset();
} finally {}
} // end removeAll
////////////////////////////////////////////////////////////////
// handle error by last resort, force content update, or removeall
public void reset () {
log.logIt("Reseting cache");
locker.writeLock();
try {
try {
dataFile.close();
File file = new File(rafDir, fileName + ".data");
file.delete();
keyFile.close();
File file2 = new File(rafDir, fileName + ".key");
file2.delete();
} catch (Exception e) {
log.logEx(e);
}
try {
dataFile = new Disk(new File(rafDir, fileName + ".data"));
keyFile = new Disk(new File(rafDir, fileName + ".key"));
keyHash = new HashMap();
} catch (IOException e) {
log.logEx(e, " -- IN RESET");
} catch (Exception e) {
log.logEx(e, " -- IN RESET");
}
} finally {
locker.done(); // release write lock.
}
} // end reset
////////////////////////////////////////////////////////////////////////////
public String getStats () {
return "fileName = " + fileName + ", numInstances = " + numInstances;
}
/////////////////////////////////////////////////////////////
// shold be called by cachemanager, since it knows how
// many are checked out
public void dispose () {
if (!isAlive) {
log.logIt("is not alive and close() was called -- " + fileName);
return;
}
locker.writeLock();
try {
if (!isAlive) {
log.logIt("is not alive and close() was called -- " + fileName);
return;
}
try {
optimizeFile();
} catch (Exception e) {
log.logEx(e, "-- " + fileName);
}
try {
numInstances--;
if (numInstances == 0) {
p( "dispose -- Closing files -- in close -- " + fileName );
log.warn("dispose -- Closing files -- in close -- " + fileName);
dataFile.close();
dataFile = null;
keyFile.close();
keyFile = null;
}
} catch (Exception e) {
log.logEx(e, "-- " + fileName);
}
} finally {
isAlive = false;
locker.done(); // release write lock;
}
} // end dispose
/** Note: synchronization currently managed by the only caller method - dispose. */
private void optimizeFile () {
try {
// Migrate from keyHash to keyHshTemp in memory,
// and from dataFile to dataFileTemp on disk.
HashMap keyHashTemp = new HashMap();
Disk dataFileTemp = new Disk(new File(rafDir, fileName + "Temp.data"));
if ( log.logLevel >= log.INFO ) {
log.info("optomizing file keyHash.size()=" + keyHash.size());
}
Iterator itr = keyHash.keySet().iterator();
while (itr.hasNext()) {
Serializable key = (Serializable)itr.next();
// Must not have a read-lock; Else results in deadlock.
CacheElement tempDe = (CacheElement)get(key, true, false);
try {
DiskElementDescriptor de = dataFileTemp.appendObject( tempDe );
if ( log.logLevel >= log.DEBUG ) {
log.debug(fileName + " -- Put " + key + " on temp disk cache");
}
keyHashTemp.put(key, de);
} catch (Exception e) {
log.logEx(e, fileName + " -- cacheName = " + cacheName + ", key = "
+ key);
}
} // end while
if ( log.logLevel >= log.DEBUG ) {
log.debug(fileName + " -- keyHashTemp.size() = " + keyHashTemp.size()
+ " / keyHash.size() = " + keyHash.size());
}
// Make dataFileTemp to become dataFile on disk.
dataFileTemp.close();
dataFile.close();
File oldData = new File(rafDir, fileName + ".data");
if (oldData.exists()) {
if (debugR) {
log.logIt(fileName + " -- oldData.length() = " + oldData.length());
}
oldData.delete();
}
File newData = new File(rafDir, fileName + "Temp.data");
File newFileName = new File(rafDir, fileName + ".data");
if (newData.exists()) {
if (debugR) {
log.logIt(fileName + " -- newData.length() = " + newData.length());
}
newData.renameTo(newFileName);
}
keyHash = keyHashTemp;
keyFile.reset();
saveKeys();
} catch (Exception e) {
log.logEx(e, "-- " + fileName);
}
} // end optimizeFile
/** Returns the cache status. */
public int getStatus () {
return isAlive ? STATUS_ALIVE : STATUS_DISPOSED;
}
/** Returns the current cache size. */
public int getSize () {
return keyHash.size();
}
public int getCacheType () {
return DISK_CACHE;
}
/** For debugging. */
public void dump () {
log.debug("keyHash.size()=" + keyHash.size());
for (Iterator itr = keyHash.entrySet().iterator(); itr.hasNext();) {
Map.Entry e = (Map.Entry)itr.next();
Serializable key = (Serializable)e.getKey();
DiskElementDescriptor ded = (DiskElementDescriptor)e.getValue();
Serializable val = get(key);
log.debug("disk dump> key=" + key + ", val=" + val + ", pos=" + ded.pos);
}
}
//////////////
public void p( String s ) {
System.out.println( "DiskCache: " + s );
}
/** Returns cache name, ha */
public String getCacheName () {
return cacheName;
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskCacheAttributes.java
Index: DiskCacheAttributes.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk;
import org.apache.stratum.jcs.auxiliary.disk.behavior.IDiskCacheAttributes;
import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheAttributes;
/**
* Title:
* Description:
* Copyright: Copyright (c) 2001
* Company:
* @author "Aaron Smuts"
* @version 1.0
*/
//////////////////////////////////////////////////
public class DiskCacheAttributes implements IDiskCacheAttributes {
private String cacheName;
private String name;
private String diskPath;
/////////////////////////////////////////
public DiskCacheAttributes() {
}
////////////////////////////////////
public void setDiskPath( String path ) {
this.diskPath = path;
}
////////////////////////////////////
public String getDiskPath( ) {
return this.diskPath;
}
////////////////////////////////////////////////////
public void setCacheName( String s ) {
this.cacheName = s;
}
public String getCacheName( ) {
return this.cacheName;
}
/////////////////////////////////////////////////////////////////////
public String getName() {
return this.name;
}
public void setName( String name ) {
this.name = name;
}
/////////////////////////////////////////////////
public IAuxiliaryCacheAttributes copy() {
try {
return (IAuxiliaryCacheAttributes)this.clone();
} catch( Exception e ){}
return (IAuxiliaryCacheAttributes)this;
}
///////////////////////////////////////////////////////////////
public String toString() {
StringBuffer str = new StringBuffer();
str.append( "diskPath = " + diskPath );
return str.toString();
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskCacheFactory.java
Index: DiskCacheFactory.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk;
import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheFactory;
import org.apache.stratum.jcs.engine.behavior.ICache;
import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheAttributes;
/**
* Title:
* Description:
* Copyright: Copyright (c) 2001
* Company:
* @author Aaron Smuts
* @version 1.0
*/
import org.apache.stratum.jcs.auxiliary.behavior.*;
import org.apache.stratum.jcs.auxiliary.disk.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
///////////////////////////////////////////////////////////////
public class DiskCacheFactory implements IAuxiliaryCacheFactory {
private static Logger log = LoggerManager.getLogger( DiskCacheFactory.class );
private static String name;
///////////////////////////////////////////////
public DiskCacheFactory() {
}
////////////////////////////////////////////////////////////////////
public ICache createCache(IAuxiliaryCacheAttributes iaca) {
IDiskCacheAttributes idca = (IDiskCacheAttributes)iaca;
DiskCacheManager dcm = DiskCacheManager.getInstance( idca );
return dcm.getCache( idca );
}
/////////////////////////////////////////////////////////////////////
public String getName() {
return this.name;
}
public void setName( String name ) {
this.name = name;
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskCacheManager.java
Index: DiskCacheManager.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk;
import org.apache.stratum.jcs.auxiliary.disk.*;
import java.io.*;
import java.util.*;
import java.sql.*;
import org.apache.stratum.jcs.auxiliary.disk.*;
import org.apache.stratum.jcs.auxiliary.disk.behavior.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
/////////////////////////////////////////////
public class DiskCacheManager implements ICacheManager {
private static int clients;
private static Hashtable caches = new Hashtable();
private static Logger log;
private static final boolean debug = false; //true;
private static final boolean pOut = false; //true;
private static DiskCacheManager instance;
private static IDiskCacheAttributes defaultCattr;
///////////////////////////////////////////////////////////////////
private DiskCacheManager( IDiskCacheAttributes cattr ) {
log = LoggerManager.getLogger( this );
this.defaultCattr = cattr;
}
////////////////////////////////////////
public IDiskCacheAttributes getDefaultCattr() {
return this.defaultCattr;
}
///////////////////////////////////////////////////////////////////
public static DiskCacheManager getInstance( IDiskCacheAttributes cattr ) {
if (instance == null) {
synchronized(DiskCacheManager.class) {
if (instance == null) {
instance = new DiskCacheManager( cattr );
}
}
}
if ( debug ) {
log.logIt( "Manager stats : " + instance.getStats() + "<br> -- in
getInstance()" );
}
clients++;
return instance;
}
/////////////////////////////////////////////////////////
public ICache getCache( String cacheName ) {
IDiskCacheAttributes cattr = (IDiskCacheAttributes)defaultCattr.copy();
cattr.setCacheName( cacheName );
return getCache( cattr );
}
//////////////////////////////////////////////////////////
public ICache getCache( IDiskCacheAttributes cattr ) {
ICache raf=null;
p( "cacheName = " + cattr.getCacheName() );
synchronized(caches) {
raf = (ICache)caches.get( cattr.getCacheName() );
if (raf == null) {
// make use cattr
//raf = new DiskCache( cattr.getCacheName(), cattr.getDiskPath() );
raf = new DiskCacheNoWaitBuffer( cattr );
caches.put( cattr.getCacheName(), raf );
}
}
if ( debug ) {
log.logIt( "Manager stats : " + instance.getStats() );
}
return raf;
}
////////////////////////////////////////////////////////////////
public void freeCache( String name ) {
DiskCache raf = (DiskCache)caches.get(name);
if (raf != null) {
raf.dispose();
}
}
// Don't care if there is a concurrency failure ?
public String getStats(){
StringBuffer stats = new StringBuffer();
Enumeration allCaches = caches.elements();
while (allCaches.hasMoreElements()) {
DiskCache raf = (DiskCache)allCaches.nextElement();
if (raf != null) {
stats.append( "<br> " + raf.getStats() );
}
}
return stats.toString();
}
///////////////////////////////////////
public int getCacheType() {
return DISK_CACHE;
}
/////////////////////////////////////////////////////////////////
public void release() {
// Wait until called by the last client
if (--clients != 0) {
return;
}
synchronized(caches) {
Enumeration allCaches = caches.elements();
while (allCaches.hasMoreElements()) {
DiskCache raf = (DiskCache)allCaches.nextElement();
if (raf != null) {
raf.dispose();
}
}
}
} //end release()
/////////////////////////////////////////////
public void p( String s ) {
if ( log.logLevel >= log.DEBUG ) {
log.debug( s );
}
if ( pOut ) {
System.out.println( "DiskCacheManager: " + s );
}
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskCacheNoWaitBuffer.java
Index: DiskCacheNoWaitBuffer.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk;
import java.io.*;
import java.rmi.*;
import java.util.*;
import org.apache.stratum.jcs.engine.control.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.disk.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
/**
* Store recent arivals in a temporary queue.
* Used to queue up update requests to the underlying cache.
* These requests will be processed in their order of arrival
* via the cache event queue processor.
*
*
* Note: There is a a problem lurking in all queued distributed
* systems, where repopulzation of memory can occur dispite a removal
* if the tow arrive in different order than issued. The possible solution
* to never delete, but only invlidate. The cache can act ina cvs like mode,
* enforcing versions of element, checking to see if a newer version exists
* adding. So a remote put could amke sure a new version hasn't arrived.
* Invalidaions would have version. We could avoid spooling these since it will
* have been a while. This will be called garuanteed mode and can be added
* to any listener.
*
*
*/
public class DiskCacheNoWaitBuffer implements ICache {
private static final boolean debug = false;//true;
// accessed by disk cache
protected Hashtable purgatory = new Hashtable();
private static final boolean debugPH = true;
private int purgHits = 0;
private IDiskCacheAttributes cattr;
private DiskCache cache;
private ICacheEventQueue q;
private transient Logger log = LoggerManager.getInstance().getLogger(this);
private String source_id =
"org.apache.stratum.jcs.auxiliary.disk.DiskCacheNoWaitBuffer";
///////////////////////////////////////////////////
public Serializable getSourceId() {
return this.source_id;
}
/**
* Constructs with the given disk cache,
* and fires up an event queue for aysnchronous processing.
*/
public DiskCacheNoWaitBuffer(IDiskCacheAttributes cattr) {
cache = new DiskCache( this, cattr );
this.cattr = cattr;
this.q = new CacheEventQueue(new CacheAdaptor(cache), CacheInfo.listenerId,
cache.getCacheName());
// need each no wait to handle each of its real updates and removes, since
there may
// be more than one per cache? alternativve is to have the cache
// perform updates using a different method that spcifies the listener
//this.q = new CacheEventQueue(new CacheAdaptor(this),
DiskCacheInfo.listenerId, cache.getCacheName());
if (cache.getStatus() == cache.STATUS_ERROR) {
log.error( "destroying queue" );
q.destroy();
}
}
/** Adds a put request to the disk cache. */
public void put(Serializable key, Serializable value ) throws IOException {
put( key, value, null );
}
public void put(Serializable key, Serializable value, Attributes attr ) throws
IOException {
try {
CacheElement ce = new CacheElement( cache.getCacheName(), key, value );
ce.setAttributes( attr );
update( ce );
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
}
public void update( ICacheElement ce ) throws IOException {
try {
if ( debug ) {
p( "putting in purgatory" );
}
PurgatoryElement pe = new PurgatoryElement( ce );
pe.isSpoolable = true;
q.addPutEvent( (ICacheElement)pe );
//q.addPutEvent( ce );
/*
// may be too slow
IDiskElement ide = new DiskElement( ce );
ide.setAttributes( ce.getAttributes() );
purgatory.put( ide.getKey(), ide );
//CacheElement ice = new CacheElement(ce.getCacheName(), ce.getKey(),
ce.getVal() );
//ice.setAttributes( ce.getAttributes() );
ide.setIsSpoolable( true );
q.addPutEvent( ide );
*/
} catch(IOException ex) {
log.error(ex);
// should we destroy purgatory. it will swell
q.destroy();
}
}
/** Synchronously reads from the disk cache. */
public Serializable get(Serializable key) {
return get( key, true );
}
public Serializable get(Serializable key, boolean container ) {
//IDiskElement ide = (IDiskElement)purgatory.get( key );
PurgatoryElement pe = (PurgatoryElement)purgatory.get(key);
//if ( ide != null ) {
if ( pe != null ) {
purgHits++;
if( debugPH ) {
if ( purgHits % 100 == 0 ) {
p( "purgatory hits = " + purgHits );
}
}
//ide.setIsSpoolable( false );
pe.isSpoolable = false;
if ( debug ) {
p( "found in purgatory" );
}
if ( container ) {
purgatory.remove( key );
//return (ICacheElement)ide;
return pe.ice;
} else {
purgatory.remove( key );
//return (Serializable)ide.getVal();
return (Serializable)pe.ice.getVal();
}
}
try {
return cache.get(key);
} catch(Exception ex) {
q.destroy();
// not sure we should do this. What about purgatory?
// must assume that we will not loose disk access
// can make a repairer, but it complicates purgatory.
}
return null;
}
/** Adds a remove request to the disk cache. */
public boolean remove(Serializable key) {
purgatory.remove( key );
try {
q.addRemoveEvent(key);
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
return false;
}
/** Adds a removeAll request to the disk cache. */
public void removeAll() {
Hashtable temp = purgatory;
purgatory = new Hashtable();
temp = null;
try {
q.addRemoveAllEvent();
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
}
/** Adds a dispose request to the disk cache. */
public void dispose() {
cache.dispose();
// may loose the end of the queue, need to be more graceful
q.destroy();
/*
try {
q.addDisposeEvent();
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
*/
}
/** No disk invokation. */
public String getStats() {
return cache.getStats();
}
/** No disk invokation. */
public int getSize() {
return cache.getSize();
}
/** No disk invokation. */
public int getCacheType() {
return cache.getCacheType();
}
/**
* Returns the asyn cache status.
* An error status indicates either the disk connection is not available,
* or the asyn queue has been unexpectedly destroyed.
* No disk invokation.
*/
public int getStatus() {
return q.isAlive() ? cache.getStatus() : cache.STATUS_ERROR;
}
public String getCacheName() {
return cache.getCacheName();
}
/** NOT USED NOW
* Replaces the disk cache service handle with the given handle and
* reset the event queue by starting up a new instance.
*/
public void fixCache(IDiskCacheService disk) {
//cache.fixCache(disk);
resetEventQ();
return;
}
/** Resets the event q by first destroying the existing one and starting up new
one. */
public void resetEventQ() {
if (q.isAlive()) {
q.destroy();
}
this.q = new CacheEventQueue(new CacheAdaptor(cache), CacheInfo.listenerId,
cache.getCacheName());
}
public String toString() {
return "DiskCacheNoWaitBuffer: " + cache.toString();
}
private void p(String s) {
System.out.println("DiskCacheNoWaitBuffer:" + s);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskDumper.java
Index: DiskDumper.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk;
import org.apache.stratum.jcs.auxiliary.disk.*;
/** Used to dump out a Disk cache from disk for debugging. */
public class DiskDumper {
public static void main(String[] args) {
if (args.length != 1) {
p("Usage: java org.apache.stratum.jcs.auxiliary.disk.DiskDump <cache_name>");
System.exit(0);
}
final DiskCache rc = new DiskCache(args[0], args[0]);
rc.dump();
System.exit(0);
}
private static void p(String s) {
System.out.println("DiskCache:"+s);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskElement.java
Index: DiskElement.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk;
import java.io.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.disk.*;
import org.apache.stratum.jcs.auxiliary.disk.behavior.*;
////////////////////////////////////////////////////////////////
/**
* Descriptor for each cache data entry stored on disk and put in purgatory.
* It is used to set the spoolable flag so the queue knows that the element doesn't
* need to be serialized.
* @deprecated see PurgatoryElement
*/
class DiskElement extends CacheElement implements IDiskElement, Serializable {
private boolean isSpoolable = false;
public DiskElement( ICacheElement ice ) {
this( ice.getCacheName(), ice.getKey(), ice.getVal() );
}
public DiskElement( String cacheName, Serializable key, Serializable val) {
super( cacheName, key, val );
}
// lets the queue know that is ready to be spooled
public boolean getIsSpoolable() {
return isSpoolable;
}
public void setIsSpoolable( boolean isSpoolable ) {
this.isSpoolable = isSpoolable;
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskElementDescriptor.java
Index: DiskElementDescriptor.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk;
import java.io.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.auxiliary.disk.*;
/**
* Disk objects are located by descriptor entries. These are saved on shutdown
* and loaded into memory on startup.
*/
public class DiskElementDescriptor implements Serializable {
/** Position of the cache data entry on disk. */
long pos;
/** Number of bytes the serialized form of the cache data takes. */
public int len;
public void init(long pos, byte[] data) {
this.pos = pos;
this.len = data.length;
// this.hash = hashCode(data);
}
//////////////////////////////////
public DiskElementDescriptor() {
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskLockManager.java
Index: DiskLockManager.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk;
import org.apache.stratum.jcs.auxiliary.disk.*;
import org.apache.stratum.jcs.utils.reuse.*;
import java.util.*;
/** Read/Write lock manager for Disk. */
class DiskLockManager extends ReadWriteLockManager {
/**
* @TODO might need to have this lock for only one cache at a time
* might want to lock on a Diskcache instance
*/
public static final String Disk = "Disk";
private static DiskLockManager instance;
boolean debug = false;
private final Hashtable ht = new Hashtable();
///////////////////////////////////////////////
private DiskLockManager() {
}
static DiskLockManager getInstance() {
if (instance == null) {
synchronized(DiskLockManager.class) {
if (instance == null) {
instance = new DiskLockManager();
}
}
}
return instance;
}
///////////////////////////////////////////
protected Hashtable getLocks() {
return ht;
}
////////////////////////////////////////////
void readLock() {
try {
readLock(Disk);
} catch(InterruptedException ex) {
// should never happen.
ex.printStackTrace();
throw new IllegalStateException(ex.getMessage());
}
}
////////////////////////////////////////////
void writeLock() {
try {
writeLock(Disk);
} catch(InterruptedException ex) {
// should never happen.
ex.printStackTrace();
throw new IllegalStateException(ex.getMessage());
}
}
///////////////////////////////////////////////////
void done() {
done(Disk);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/PurgatoryElement.java
Index: PurgatoryElement.java
===================================================================
package org.apache.stratum.jcs.auxiliary.disk;
import java.io.Serializable;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.control.*;
/**
* Title:
* Description: This serves as a quickly createable wraper for CacheElements
* in purgatory.
* Copyright: Copyright (c) 2001
* Company:
* @author Aaron Smuts
* @version 1.0
*/
public class PurgatoryElement implements ICacheElement, Serializable {
// need speed here. the method calls are unnecessary. make protected
protected boolean isSpoolable = false;
protected ICacheElement ice;
public PurgatoryElement( ICacheElement ice ) {
this.ice = ice;
}
// lets the queue know that is ready to be spooled
public boolean getIsSpoolable() {
return isSpoolable;
}
public void setIsSpoolable( boolean isSpoolable ) {
this.isSpoolable = isSpoolable;
}
// ICacheElement Methods
public String getCacheName() {
return ice.getCacheName();
}
public Serializable getKey() {
return ice.getKey();
}
public Serializable getVal() {
return ice.getVal();
}
public Attributes getAttributes() {
return ice.getAttributes();
}
public void setAttributes( IAttributes attr ) {
ice.setAttributes( attr );
}
public long getCreateTime() {
return ice.getCreateTime();
}
}
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>