asmuts      02/01/14 22:20:03

  Added:       src/java/org/apache/stratum/jcs/auxiliary/disk Disk.java
                        DiskCache.java DiskCacheAttributes.java
                        DiskCacheFactory.java DiskCacheManager.java
                        DiskCacheNoWaitBuffer.java DiskDumper.java
                        DiskElement.java DiskElementDescriptor.java
                        DiskLockManager.java PurgatoryElement.java
  Log:
  Default disk cache implementation
  Memory key storage
  needs real time defragmentation and a disk key swap
  store keys and defragments on sutdown
  quick
  purgatory for incoming items
  
  Revision  Changes    Path
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/Disk.java
  
  Index: Disk.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk;
  
  import  java.io.*;
  import  java.util.*;
  
  import org.apache.stratum.jcs.auxiliary.disk.*;
  import org.apache.stratum.jcs.access.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.control.Cache;
  
  import org.apache.stratum.jcs.utils.log.*;
  
  
  /**
   * Provides thread safe access to the underlying random access file.
   */
  class Disk {
  
    private final Logger log;
    private final String filepath;
  
    private RandomAccessFile raf;
    private static boolean debug = false;
  
    //////////////////////////////////////////////////////
    Disk (File file) throws FileNotFoundException {
      log = LoggerManager.getLogger(this);
      this.filepath = file.getAbsolutePath();
      raf = new RandomAccessFile(filepath, "rw");
    }
  
    //////////////////////////////////////////////
    Serializable readObject (long pos) {
      byte[] data = null;
      boolean corrupted = false;
      try {
        synchronized (this) {
          raf.seek(pos);
          int datalen = raf.readInt();
          if (datalen > raf.length()) {
            corrupted = true;
          }
          else {
            raf.readFully(data = new byte[datalen]);
          }
        }
        if (corrupted) {
          log.logIt("The datFile is corrupted");
          //reset();
          return  null;
        }
        ByteArrayInputStream bais = new ByteArrayInputStream(data);
        BufferedInputStream bis = new BufferedInputStream( bais );
        ObjectInputStream ois = new ObjectInputStream(bis);
        try {
          return  (Serializable)ois.readObject();
        } finally {
          ois.close();
        }
      } catch (Exception e) {
        log.logEx(e, "-- " + raf);
      }
      return  null;
    }
  
    /** Appends byte array to the Disk. */
    boolean append (byte[] data) {
      try {
        synchronized (this) {
          return  write(data, raf.length());
        }
      } catch (IOException ex) {
        ex.printStackTrace();
      }
      return  false;
    }
  
    /** Writes the given byte array to the Disk at the specified position. */
    boolean write (byte[] data, long pos) {
      if (debug) {
        log.debug("write> pos=" + pos);
        log.debug(raf + " -- data.length = " + data.length);
      }
      try {
        synchronized (this) {
          raf.seek(pos);
          raf.writeInt(data.length);
          raf.write(data);
        }
        return  true;
      } catch (IOException ex) {
        ex.printStackTrace();
      }
      return  false;
    }
  
    ////////////////////////////////////////////////////////
    boolean writeObject (Serializable obj, long pos) {
      try {
        return  write(serialize(obj), pos);
      } catch (IOException ex) {
        ex.printStackTrace();
      }
      return  false;
    }
  
    //////////////////////////////////////////////////////////
    DiskElementDescriptor appendObject ( CacheElement obj) {
      long pos = -1;
      boolean success = false;
      try {
  
        DiskElementDescriptor ded = new DiskElementDescriptor();
        byte[] data = serialize(obj);
  
        synchronized (this) {
          pos = raf.length();
          ded.init( pos, data );
          success = write(data, pos);
        }
        //return  success ? new DiskElement(pos, data) : null;
        return  success ? ded : null;
  
      } catch (IOException ex) {
        ex.printStackTrace();
      }
      return  null;
    }
  
    /////////////////////////////////////////////////////////
    /** Returns the raf length. */
    long length () throws IOException {
      synchronized (this) {
        return  raf.length();
      }
    }
  
    //////////////////////////////////////////////////////////
    /** Closes the raf. */
    synchronized void close () throws IOException {
      raf.close();
      return;
    }
  
    //////////////////////////////////////////////////////////
    /** Sets the raf to empty. */
    synchronized void reset () throws IOException {
      raf.close();
      File f = new File(filepath);
      int i = 0;
      for (; i < 10 && !f.delete(); i++) {
        try {
          Thread.currentThread().sleep(1000);
        } catch (InterruptedException ex) {}
          log.warn("Failed to delete " + f.getName() + " " + i);
      }
      if (i == 10) {
        IllegalStateException ex = new IllegalStateException("Failed to delete " + 
f.getName());
        log.error(ex);
        throw ex;
      }
      raf = new RandomAccessFile(filepath, "rw");
      return;
    }
  
    /////////////////////////////////////////////////////////////////////////
    /** Returns the serialized form of the given object in a byte array. */
    static byte[] serialize (Serializable obj) throws IOException {
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      ObjectOutputStream oos = new ObjectOutputStream(baos);
      try {
        oos.writeObject(obj);
      } finally {
        oos.close();
      }
      return  baos.toByteArray();
    }
  }
  
  
  
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskCache.java
  
  Index: DiskCache.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk;
  
  import  java.io.*;
  import  java.util.*;
  import  java.sql.*;
  
  import org.apache.stratum.jcs.access.*;
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.auxiliary.disk.*;
  import org.apache.stratum.jcs.auxiliary.disk.behavior.*;
  import org.apache.stratum.jcs.engine.control.*;
  import org.apache.stratum.jcs.engine.control.Cache;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.engine.*;
  
  import org.apache.stratum.jcs.utils.data.*;
  import org.apache.stratum.jcs.utils.log.*;
  
  
  
  ////////////////////////////////////////////////////////////////////////
  /**
   *  @author Aaron Smuts
   *  @version 1.0
   *
   *  @author Hanson Char
   *  @version 1.1
   *  <br><br>Changes in version 1.1:
   *  <ol>
   *    <li>replace synchronization mechanism with read/write lock manager;
   *    <li>replace the use of DiskIO with Disk;
   *    <li>avoid disk write if no change to data value;
   *    <li>reduce disk fragmentation by re-using disk locations when possible;
   *  </ol>
   *
   *  @author Aaron Smuts
   *  @version 1.2
   *  Made the disk cache attribute driven.
   *  Uses common ICache interface and ICacheAttributes.
   *
   */
  public class DiskCache implements ICache, Serializable {
  
    Logger log;
    private static int numCreated = 0;
    private int numInstances = 0;
  
    // trivial debugging that should be removed by the compiler
    private static final boolean debug =  false;//true;
    private static final boolean debugR =  false;//true;
    private static final boolean debugPut = false;//true;
    private static final boolean debugGet = false;//true;
  
    private final static DiskLockManager locker = DiskLockManager.getInstance();
    private String fileName;
    private String cacheName;
    private Disk dataFile;
    private Disk keyFile;
    private HashMap keyHash;      // not synchronized to maximize concurrency.
    public boolean isAlive = false;
    private File rafDir;
  
    IDiskCacheAttributes cattr;
  
    // not used
    private String source_id = "org.apache.stratum.jcs.auxiliary.disk.DiskCache";
  
    // disk cache buffer, need to remove key from buffer on update, if its there
    DiskCacheNoWaitBuffer buffer;
  
    ///////////////////////////////////////////////////
    public Serializable getSourceId() {
      return this.source_id;
    }
  
    ////////////////////////////////////////////////////////////////////
    // should use this method
    public DiskCache ( DiskCacheNoWaitBuffer buffer, IDiskCacheAttributes cattr ) {
      this( cattr.getCacheName(), cattr.getDiskPath() );
      this.cattr = cattr;
      this.buffer = buffer;
    }
    protected DiskCache ( String cacheName ) {
      this( cacheName, null);
    }
    //////////////////////////////////////////////////////////////////////
    protected DiskCache ( String cacheName, String rafroot ) {
      numInstances++;
      this.fileName = cacheName;
      this.cacheName = cacheName;
  
      //String rafroot = cattr.getDiskPath();
      if ( rafroot == null ) {
        try {
          PropertyGroups pg = new PropertyGroups( "/cache.properties" );
          rafroot = pg.getProperty( "diskPath" );
        } catch( Exception e ) {
          log.error( e );
        }
      }
      rafDir = new File(rafroot);
      rafDir.mkdirs();
      log = LoggerManager.getLogger(this);
      log.info("rafroot=" + rafroot);
  
      try {
        try {
          dataFile = new Disk(new File(rafDir, fileName + ".data"));
          keyFile = new Disk(new File(rafDir, fileName + ".key"));
          if (keyFile.length() > 0) {
            loadKeys();
            if (keyHash.size() == 0) {
              dataFile.reset();
            }
          }
          else {
            keyHash = new HashMap();
            if (dataFile.length() > 0) {
              dataFile.reset();
            }
          }
          isAlive = true;
        } catch (FileNotFoundException e) {
          log.logEx(e);
        } catch (Exception e) {
          log.logEx(e, fileName);
        }
      } catch (Exception e) {
        log.logEx(e);
  
      }
    }             // end constructor
  
    ////////////////////////////////////////////////////////////////////
    public void loadKeys () {
      locker.writeLock();
      try {
        keyHash = (HashMap)keyFile.readObject(0);
        if (keyHash == null) {
          keyHash = new HashMap();
        }
        if (debug) {
          log.logIt("LOADKEYS " + fileName + " -- keyHash.size() = " + keyHash.size());
        }
      } catch (Exception e) {
        log.logEx(e, fileName);
      } finally {
        locker.done();            // release read lock.
      }
    }             // end loadKeys
  
    ////////////////////////////////////////////////////////////////////
    public void saveKeys () {
      try {
        if (debug) {
          log.logIt("SAVEKEYS " + fileName + " -- keyHash.size() = " + keyHash.size());
        }
        locker.writeLock();
        try {
          keyFile.reset();
          if (keyHash.size() > 0) {
            keyFile.writeObject(keyHash, 0);
          }
        } finally {
          locker.done();          // release write lock.
        }
      } catch (Exception e) {
        log.logEx(e);
      }
    }             // end saveKeys
  
    ////////////////////////////////////////////////////////
    public void add (Serializable key, Serializable value)   throws IOException {
      put(key, value);
    }
  
    // ignore the multicast field.
    public void put (Serializable key, Serializable value, boolean multicast)  throws 
IOException {
      put(key, value);
    }
  
    public void put (Serializable key, Serializable value)  throws IOException {
      put( key, value, null );
    }
    public void put (Serializable key, Serializable value, Attributes attr)  throws 
IOException {
      CacheElement ce = null;
      ce = new CacheElement( cacheName, key, value );
      ce.setAttributes( attr );
      update( ce );
    }
    ///////////////////////////////////////////////////////////
    public void update( ICacheElement ce ) throws IOException {
  
      if ( debug ) {
        p( "update" );
      }
  
      if (!isAlive) {
        return;
      }
  
      if ( ce instanceof PurgatoryElement ) {
        PurgatoryElement pe = (PurgatoryElement)ce;
        ce = pe.ice;
        if ( !pe.isSpoolable ) {
          if ( debug ) {
            p( "pe is not spoolable" );
          }
          // it has been plucked from purgatory
          return;
        }
      }
      /*
      if ( ce instanceof IDiskElement ) {
        IDiskElement ide = (IDiskElement)ce;
        if ( !ide.getIsSpoolable() ) {
          // it has been plucked from purgatory
          return;
        }
      }
      */
      // remove item from purgatory since we are putting it on disk
      // assume that the disk cache will never break
      // disk breakage is akin to an out of memory exception
      buffer.purgatory.remove( ce.getKey() );
      if ( debug ) {
        p( "putting " + ce.getKey() + " on disk, removing from purgatory" );
      }
  
      //System.out.println( "putting " + ce );
  
      DiskElementDescriptor ded = null;
      try {
  
        ded = new DiskElementDescriptor();
        byte[] data = Disk.serialize( ce );
        ded.init( dataFile.length(), data );
  
        // make sure this only locks for one particular cache region
        locker.writeLock();
  
        try {
          if (!isAlive) {
            return;
          }
          // Presume it's an append.
          //DiskElement re = new DiskElement( cacheName, key, value );
          //re.init( dataFile.length(), data );
  
          DiskElementDescriptor old = (DiskElementDescriptor)keyHash.put(ce.getKey(), 
ded );
  
          // Item with the same key already exists in file.
          // Try to reuse the location if possible.
          if (old != null && ded.len <= old.len) {
            ded.pos = old.pos;
          }
          dataFile.write(data, ded.pos);
          /*
           // Only need to write if the item with the same key and value
           // does not already exist in the file.
           if (re.equals(old)) {
           re.pos = old.pos;
           }
           else {
           // Item with the same key but different value already exists in file.
           // Try to reuse the location if possible.
           if (old != null && re.len <= old.len) {
           re.pos = old.pos;
           }
           dataFile.write(data, re.pos);
           }
           */
  
        } finally {
          locker.done();          // release write lock.
        }
        if ( log.logLevel >= log.DEBUG ) {
          log.debug(fileName + " -- put " + ce.getKey() + " on disk at pos " + ded.pos 
+ " size = " + ded.len );
        }
      } catch( ConcurrentModificationException cme ) {
        // do nothing, this means it has gone back to memory mid serialization
      } catch (Exception e) {
        log.logEx(e, "cacheName = " + cacheName + ", ce.getKey() = " + ce.getKey());
        //reset();
      }
      return;
    }
  
    ///////////////////////////////////////////////////////////////
    public Serializable get (Serializable key) {
      return  get(key, true, true);
    }
  
    ///////////////////////////////////////////////////////////////
    public Serializable get (Serializable key, boolean container ) {
      return  get(key, true, true);
    }
  
    //////////////////////////////////////////////////////////////////////
    private Serializable get (Serializable key, boolean container, final boolean lock) 
{
  
      if ( debugGet ) {
        p( "getting " + key + " from disk" );
      }
  
      if (!isAlive) {
        return  null;
      }
  
      Serializable obj = null;
      if (lock) {
        locker.readLock();
      }
      try {
        if (!isAlive) {
          return  null;
        }
        DiskElementDescriptor ded = (DiskElementDescriptor)keyHash.get(key);
        if (ded != null) {
          if ( debugGet ) {
            p( "found " + key + " on disk" );
          }
          obj = dataFile.readObject(ded.pos);
        }
        //System.out.println( "got element = " + (CacheElement)obj);
      } catch (NullPointerException e) {
        log.logEx(e, "cacheName = " + cacheName + ", key = " + key);
      } catch (Exception e) {
        log.logEx(e, "cacheName = " + cacheName + ", key = " + key);
      } finally {
        if (lock) {
          locker.done();
        }
      }
  
      return  obj;
    }
  
    /**
     * Returns true if the removal was succesful; or false if there is nothing to 
remove.
     * Current implementation always result in a disk orphan.
     */
    public boolean remove (Serializable key) {
      locker.writeLock();
      try {
  
        if (key instanceof String && 
key.toString().endsWith(NAME_COMPONENT_DELIMITER)) {
          // remove all keys of the same name group.
          boolean removed = false;
          for (Iterator itr = keyHash.entrySet().iterator(); itr.hasNext();) {
            Map.Entry entry = (Map.Entry)itr.next();
            Object k = entry.getKey();
            if (k instanceof String && k.toString().startsWith(key.toString())) {
              itr.remove();
              removed = true;
            }
          }
          return removed;
        }
        // remove single item.
        return  keyHash.remove(key) != null;
      } catch (Exception e) {
        log.logEx(e);
        reset();
      } finally {
        locker.done();            // release write lock.
      }
      return  false;
    }
  
    /////////////////////////////////////////////
    public void removeAll () {
      try {
        reset();
      } catch (Exception e) {
        log.logEx(e);
        reset();
      } finally {}
    }             // end removeAll
  
    ////////////////////////////////////////////////////////////////
    // handle error by last resort, force content update, or removeall
    public void reset () {
      log.logIt("Reseting cache");
      locker.writeLock();
      try {
        try {
          dataFile.close();
          File file = new File(rafDir, fileName + ".data");
          file.delete();
          keyFile.close();
          File file2 = new File(rafDir, fileName + ".key");
          file2.delete();
        } catch (Exception e) {
          log.logEx(e);
        }
        try {
          dataFile = new Disk(new File(rafDir, fileName + ".data"));
          keyFile = new Disk(new File(rafDir, fileName + ".key"));
          keyHash = new HashMap();
        } catch (IOException e) {
          log.logEx(e, " -- IN RESET");
        } catch (Exception e) {
          log.logEx(e, " -- IN RESET");
        }
      } finally {
        locker.done();            // release write lock.
      }
    }             // end reset
  
    ////////////////////////////////////////////////////////////////////////////
    public String getStats () {
      return  "fileName = " + fileName + ", numInstances = " + numInstances;
    }
  
    /////////////////////////////////////////////////////////////
    // shold be called by cachemanager, since it knows how
    // many are checked out
    public void dispose () {
      if (!isAlive) {
        log.logIt("is not alive and close() was called -- " + fileName);
        return;
      }
      locker.writeLock();
      try {
        if (!isAlive) {
          log.logIt("is not alive and close() was called -- " + fileName);
          return;
        }
        try {
          optimizeFile();
        } catch (Exception e) {
          log.logEx(e, "-- " + fileName);
        }
        try {
          numInstances--;
          if (numInstances == 0) {
            p( "dispose -- Closing files -- in close -- " + fileName );
            log.warn("dispose -- Closing files -- in close -- " + fileName);
            dataFile.close();
            dataFile = null;
            keyFile.close();
            keyFile = null;
          }
        } catch (Exception e) {
          log.logEx(e, "-- " + fileName);
        }
      } finally {
        isAlive = false;
        locker.done();            // release write lock;
      }
    }             // end dispose
  
    /** Note: synchronization currently managed by the only caller method - dispose. */
    private void optimizeFile () {
      try {
        // Migrate from keyHash to keyHshTemp in memory,
        // and from dataFile to dataFileTemp on disk.
        HashMap keyHashTemp = new HashMap();
        Disk dataFileTemp = new Disk(new File(rafDir, fileName + "Temp.data"));
        if ( log.logLevel >= log.INFO ) {
          log.info("optomizing file keyHash.size()=" + keyHash.size());
        }
        Iterator itr = keyHash.keySet().iterator();
        while (itr.hasNext()) {
          Serializable key = (Serializable)itr.next();
  
          // Must not have a read-lock;  Else results in deadlock.
          CacheElement tempDe = (CacheElement)get(key, true, false);
          try {
            DiskElementDescriptor de = dataFileTemp.appendObject( tempDe );
            if ( log.logLevel >= log.DEBUG ) {
              log.debug(fileName + " -- Put " + key + " on temp disk cache");
            }
            keyHashTemp.put(key, de);
          } catch (Exception e) {
            log.logEx(e, fileName + " -- cacheName = " + cacheName + ", key = "
                + key);
          }
        }         // end while
        if ( log.logLevel >= log.DEBUG ) {
          log.debug(fileName + " -- keyHashTemp.size() =  " + keyHashTemp.size()
              + " / keyHash.size() =  " + keyHash.size());
        }
        // Make dataFileTemp to become dataFile on disk.
        dataFileTemp.close();
        dataFile.close();
        File oldData = new File(rafDir, fileName + ".data");
        if (oldData.exists()) {
          if (debugR) {
            log.logIt(fileName + " -- oldData.length() = " + oldData.length());
          }
          oldData.delete();
        }
        File newData = new File(rafDir, fileName + "Temp.data");
        File newFileName = new File(rafDir, fileName + ".data");
        if (newData.exists()) {
          if (debugR) {
            log.logIt(fileName + " -- newData.length() = " + newData.length());
          }
          newData.renameTo(newFileName);
        }
        keyHash = keyHashTemp;
        keyFile.reset();
        saveKeys();
      } catch (Exception e) {
        log.logEx(e, "-- " + fileName);
      }
    }             // end optimizeFile
  
    /** Returns the cache status. */
    public int getStatus () {
      return  isAlive ? STATUS_ALIVE : STATUS_DISPOSED;
    }
  
    /** Returns the current cache size. */
    public int getSize () {
      return  keyHash.size();
    }
  
    public int getCacheType () {
      return  DISK_CACHE;
    }
  
    /** For debugging. */
    public void dump () {
      log.debug("keyHash.size()=" + keyHash.size());
      for (Iterator itr = keyHash.entrySet().iterator(); itr.hasNext();) {
        Map.Entry e = (Map.Entry)itr.next();
        Serializable key = (Serializable)e.getKey();
        DiskElementDescriptor ded = (DiskElementDescriptor)e.getValue();
        Serializable val = get(key);
        log.debug("disk dump> key=" + key + ", val=" + val + ", pos=" + ded.pos);
      }
    }
  
    //////////////
    public void p( String s ) {
      System.out.println( "DiskCache: " + s );
    }
  
  
    /** Returns cache name, ha */
    public String getCacheName () {
      return  cacheName;
    }
  }               // end class
  
  
  
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskCacheAttributes.java
  
  Index: DiskCacheAttributes.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk;
  
  import org.apache.stratum.jcs.auxiliary.disk.behavior.IDiskCacheAttributes;
  import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheAttributes;
  
  /**
   * Title:
   * Description:
   * Copyright:    Copyright (c) 2001
   * Company:
   * @author "Aaron Smuts"
   * @version 1.0
   */
  
  //////////////////////////////////////////////////
  public class DiskCacheAttributes implements IDiskCacheAttributes {
  
    private String cacheName;
    private String name;
  
    private String diskPath;
  
    /////////////////////////////////////////
    public DiskCacheAttributes() {
    }
  
    ////////////////////////////////////
    public void setDiskPath( String path ) {
      this.diskPath = path;
    }
  
    ////////////////////////////////////
    public String getDiskPath( ) {
      return this.diskPath;
    }
  
    ////////////////////////////////////////////////////
    public void setCacheName( String s ) {
      this.cacheName = s;
    }
    public String getCacheName( ) {
      return this.cacheName;
    }
  
    /////////////////////////////////////////////////////////////////////
    public String getName() {
      return this.name;
    }
    public void setName( String name ) {
      this.name = name;
    }
  
    /////////////////////////////////////////////////
    public IAuxiliaryCacheAttributes copy() {
      try {
        return (IAuxiliaryCacheAttributes)this.clone();
      } catch( Exception e ){}
      return (IAuxiliaryCacheAttributes)this;
    }
  
    ///////////////////////////////////////////////////////////////
    public String toString() {
      StringBuffer str = new StringBuffer();
      str.append( "diskPath = " + diskPath );
      return str.toString();
    }
  
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskCacheFactory.java
  
  Index: DiskCacheFactory.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk;
  
  import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheFactory;
  import org.apache.stratum.jcs.engine.behavior.ICache;
  import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheAttributes;
  
  /**
   * Title:
   * Description:
   * Copyright:    Copyright (c) 2001
   * Company:
   * @author Aaron Smuts
   * @version 1.0
   */
  
  import org.apache.stratum.jcs.auxiliary.behavior.*;
  import org.apache.stratum.jcs.auxiliary.disk.behavior.*;
  import org.apache.stratum.jcs.utils.log.*;
  
  ///////////////////////////////////////////////////////////////
  public class DiskCacheFactory implements IAuxiliaryCacheFactory {
  
    private static Logger log = LoggerManager.getLogger( DiskCacheFactory.class );
  
    private static String name;
  
    ///////////////////////////////////////////////
    public DiskCacheFactory() {
    }
  
    ////////////////////////////////////////////////////////////////////
    public ICache createCache(IAuxiliaryCacheAttributes iaca) {
      IDiskCacheAttributes idca = (IDiskCacheAttributes)iaca;
      DiskCacheManager dcm = DiskCacheManager.getInstance( idca );
      return dcm.getCache( idca );
    }
  
    /////////////////////////////////////////////////////////////////////
    public String getName() {
      return this.name;
    }
    public void setName( String name ) {
      this.name = name;
    }
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskCacheManager.java
  
  Index: DiskCacheManager.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk;
  
  import org.apache.stratum.jcs.auxiliary.disk.*;
  import java.io.*;
  import java.util.*;
  import java.sql.*;
  
  import org.apache.stratum.jcs.auxiliary.disk.*;
  import org.apache.stratum.jcs.auxiliary.disk.behavior.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.utils.log.*;
  
  /////////////////////////////////////////////
  public class DiskCacheManager implements ICacheManager {
  
    private static int clients;
  
    private static Hashtable caches = new Hashtable();
  
    private static Logger log;
    private static final boolean debug = false; //true;
    private static final boolean pOut = false; //true;
  
    private static DiskCacheManager instance;
  
    private static IDiskCacheAttributes defaultCattr;
  
    ///////////////////////////////////////////////////////////////////
     private DiskCacheManager( IDiskCacheAttributes cattr ) {
      log = LoggerManager.getLogger( this );
      this.defaultCattr = cattr;
    }
  
    ////////////////////////////////////////
    public IDiskCacheAttributes getDefaultCattr() {
      return this.defaultCattr;
    }
  
    ///////////////////////////////////////////////////////////////////
     public static DiskCacheManager getInstance( IDiskCacheAttributes cattr ) {
        if (instance == null) {
          synchronized(DiskCacheManager.class) {
            if (instance == null) {
              instance = new DiskCacheManager( cattr );
            }
          }
        }
        if ( debug ) {
          log.logIt( "Manager stats : " + instance.getStats() + "<br> -- in 
getInstance()" );
        }
        clients++;
        return instance;
     }
  
     /////////////////////////////////////////////////////////
    public ICache getCache( String cacheName ) {
      IDiskCacheAttributes cattr = (IDiskCacheAttributes)defaultCattr.copy();
      cattr.setCacheName( cacheName );
      return getCache( cattr );
    }
  
    //////////////////////////////////////////////////////////
    public ICache getCache( IDiskCacheAttributes cattr ) {
        ICache raf=null;
  
        p( "cacheName = " + cattr.getCacheName() );
  
        synchronized(caches) {
          raf = (ICache)caches.get( cattr.getCacheName() );
  
          if (raf == null) {
            // make use cattr
            //raf = new DiskCache( cattr.getCacheName(), cattr.getDiskPath() );
            raf = new DiskCacheNoWaitBuffer( cattr );
            caches.put( cattr.getCacheName(), raf );
          }
        }
        if ( debug ) {
          log.logIt( "Manager stats : " + instance.getStats() );
        }
        return raf;
     }
  
  
  
    ////////////////////////////////////////////////////////////////
     public void freeCache( String name ) {
        DiskCache raf = (DiskCache)caches.get(name);
        if (raf != null) {
           raf.dispose();
        }
     }
  
    // Don't care if there is a concurrency failure ?
    public String getStats(){
      StringBuffer stats = new StringBuffer();
      Enumeration allCaches = caches.elements();
  
      while (allCaches.hasMoreElements()) {
        DiskCache raf = (DiskCache)allCaches.nextElement();
        if (raf != null) {
         stats.append( "<br>&nbsp;&nbsp;&nbsp;" + raf.getStats() );
        }
      }
      return stats.toString();
    }
  
    ///////////////////////////////////////
    public int getCacheType() {
      return DISK_CACHE;
    }
  
  
    /////////////////////////////////////////////////////////////////
    public void release() {
      // Wait until called by the last client
      if (--clients != 0) {
        return;
      }
      synchronized(caches) {
        Enumeration allCaches = caches.elements();
  
        while (allCaches.hasMoreElements()) {
          DiskCache raf = (DiskCache)allCaches.nextElement();
          if (raf != null) {
            raf.dispose();
          }
        }
      }
    } //end release()
  
  
    /////////////////////////////////////////////
    public void p( String s ) {
      if ( log.logLevel >= log.DEBUG ) {
        log.debug( s );
      }
      if ( pOut ) {
        System.out.println( "DiskCacheManager: " + s );
      }
    }
  
  
  } // end class
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskCacheNoWaitBuffer.java
  
  Index: DiskCacheNoWaitBuffer.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk;
  
  import java.io.*;
  import java.rmi.*;
  import java.util.*;
  
  import org.apache.stratum.jcs.engine.control.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.disk.behavior.*;
  import org.apache.stratum.jcs.utils.log.*;
  
  
  /**
   * Store recent arivals in a temporary queue.
   * Used to queue up update requests to the underlying cache.
   * These requests will be processed in their order of arrival
   * via the cache event queue processor.
   *
   *
   *  Note:  There is a a problem lurking in all queued distributed
   *  systems, where repopulzation of memory can occur dispite a removal
   *  if the tow arrive in different order than issued.  The possible solution
   *  to never delete, but only invlidate.  The cache can act ina cvs like mode,
   *  enforcing versions of element, checking to see if a newer version exists
   *  adding.  So a remote put could amke sure a new version hasn't arrived.
   *  Invalidaions would have version.  We could avoid spooling these since it will
   *  have been a while.  This will be called garuanteed mode and can be added
   *  to any listener.
   *
   *
   */
  public class DiskCacheNoWaitBuffer implements ICache {
  
    private static final boolean debug =  false;//true;
  
    // accessed by disk cache
    protected Hashtable purgatory = new Hashtable();
    private static final boolean debugPH =  true;
    private int purgHits = 0;
  
    private IDiskCacheAttributes cattr;
    private DiskCache cache;
    private ICacheEventQueue q;
    private transient Logger log = LoggerManager.getInstance().getLogger(this);
  
    private String source_id = 
"org.apache.stratum.jcs.auxiliary.disk.DiskCacheNoWaitBuffer";
  
  
  
    ///////////////////////////////////////////////////
    public Serializable getSourceId() {
      return this.source_id;
    }
  
    /**
     * Constructs with the given disk cache,
     * and fires up an event queue for aysnchronous processing.
     */
    public DiskCacheNoWaitBuffer(IDiskCacheAttributes cattr) {
        cache = new DiskCache( this, cattr );
        this.cattr = cattr;
        this.q = new CacheEventQueue(new CacheAdaptor(cache), CacheInfo.listenerId, 
cache.getCacheName());
  
        // need each no wait to handle each of its real updates and removes, since 
there may
        // be more than one per cache?  alternativve is to have the cache
        // perform updates using a different method that spcifies the listener
        //this.q = new CacheEventQueue(new CacheAdaptor(this), 
DiskCacheInfo.listenerId, cache.getCacheName());
        if (cache.getStatus() == cache.STATUS_ERROR) {
          log.error( "destroying queue" );
          q.destroy();
        }
    }
  
    /** Adds a put request to the disk cache. */
    public void put(Serializable key, Serializable value ) throws IOException {
      put( key, value, null );
    }
    public void put(Serializable key, Serializable value,  Attributes attr ) throws 
IOException {
      try {
        CacheElement ce = new CacheElement( cache.getCacheName(), key, value );
        ce.setAttributes( attr );
        update( ce );
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
    }
    public void update( ICacheElement ce ) throws IOException {
      try {
        if ( debug ) {
          p( "putting in purgatory" );
        }
  
  
        PurgatoryElement pe = new PurgatoryElement( ce );
        pe.isSpoolable = true;
        q.addPutEvent( (ICacheElement)pe );
  
        //q.addPutEvent( ce );
  
        /*
        // may be too slow
        IDiskElement ide = new DiskElement( ce );
        ide.setAttributes( ce.getAttributes() );
        purgatory.put( ide.getKey(), ide );
        //CacheElement ice = new CacheElement(ce.getCacheName(), ce.getKey(), 
ce.getVal() );
        //ice.setAttributes( ce.getAttributes() );
        ide.setIsSpoolable( true );
        q.addPutEvent( ide );
        */
  
      } catch(IOException ex) {
        log.error(ex);
        // should we destroy purgatory.  it will swell
        q.destroy();
      }
    }
  
    /** Synchronously reads from the disk cache. */
    public Serializable get(Serializable key) {
      return get( key, true );
    }
    public Serializable get(Serializable key, boolean container ) {
      //IDiskElement ide = (IDiskElement)purgatory.get( key );
      PurgatoryElement pe = (PurgatoryElement)purgatory.get(key);
      //if ( ide != null ) {
      if ( pe != null ) {
        purgHits++;
        if( debugPH ) {
          if ( purgHits % 100 == 0 ) {
            p( "purgatory hits = " + purgHits );
          }
        }
  
        //ide.setIsSpoolable( false );
        pe.isSpoolable = false;
        if ( debug ) {
          p( "found in purgatory" );
        }
        if ( container ) {
          purgatory.remove( key );
          //return (ICacheElement)ide;
          return pe.ice;
        } else {
          purgatory.remove( key );
          //return (Serializable)ide.getVal();
          return (Serializable)pe.ice.getVal();
        }
      }
      try {
        return cache.get(key);
      } catch(Exception ex) {
        q.destroy();
        // not sure we should do this.  What about purgatory?
        // must assume that we will not loose disk access
        // can make a repairer, but it complicates purgatory.
      }
      return null;
    }
  
  
    /** Adds a remove request to the disk cache. */
    public boolean remove(Serializable key) {
      purgatory.remove( key );
      try {
        q.addRemoveEvent(key);
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
      return false;
    }
    /** Adds a removeAll request to the disk cache. */
    public void removeAll() {
  
      Hashtable temp = purgatory;
      purgatory = new Hashtable();
      temp = null;
  
      try {
        q.addRemoveAllEvent();
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
    }
    /** Adds a dispose request to the disk cache. */
    public void dispose() {
      cache.dispose();
      // may loose the end of the queue, need to be more graceful
      q.destroy();
      /*
      try {
        q.addDisposeEvent();
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
      */
  
    }
    /** No disk invokation. */
    public String getStats() {
      return cache.getStats();
    }
    /** No disk invokation. */
    public int getSize() {
      return cache.getSize();
    }
    /** No disk invokation. */
    public int getCacheType() {
      return cache.getCacheType();
    }
    /**
     * Returns the asyn cache status.
     * An error status indicates either the disk connection is not available,
     * or the asyn queue has been unexpectedly destroyed.
     * No disk invokation.
     */
    public int getStatus() {
      return q.isAlive() ? cache.getStatus() : cache.STATUS_ERROR;
    }
    public String getCacheName() {
      return cache.getCacheName();
    }
  
    /** NOT USED NOW
     * Replaces the disk cache service handle with the given handle and
     * reset the event queue by starting up a new instance.
     */
    public void fixCache(IDiskCacheService disk) {
      //cache.fixCache(disk);
      resetEventQ();
      return;
    }
    /** Resets the event q by first destroying the existing one and starting up new 
one. */
    public void resetEventQ() {
      if (q.isAlive()) {
        q.destroy();
      }
      this.q = new CacheEventQueue(new CacheAdaptor(cache), CacheInfo.listenerId, 
cache.getCacheName());
    }
    public String toString() {
      return "DiskCacheNoWaitBuffer: " + cache.toString();
    }
    private void p(String s) {
      System.out.println("DiskCacheNoWaitBuffer:" + s);
    }
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskDumper.java
  
  Index: DiskDumper.java
  ===================================================================
  
  package org.apache.stratum.jcs.auxiliary.disk;
  
  import org.apache.stratum.jcs.auxiliary.disk.*;
  
  /** Used to dump out a Disk cache from disk for debugging. */
  public class DiskDumper {
  
    public static void main(String[] args) {
      if (args.length != 1) {
          p("Usage: java org.apache.stratum.jcs.auxiliary.disk.DiskDump <cache_name>");
          System.exit(0);
      }
      final DiskCache rc = new DiskCache(args[0], args[0]);
      rc.dump();
      System.exit(0);
    }
  
    private static void p(String s) {
      System.out.println("DiskCache:"+s);
    }
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskElement.java
  
  Index: DiskElement.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk;
  
  import java.io.*;
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.disk.*;
  import org.apache.stratum.jcs.auxiliary.disk.behavior.*;
  
  ////////////////////////////////////////////////////////////////
  /**
   * Descriptor for each cache data entry stored on disk and put in purgatory.
   * It is used to set the spoolable flag so the queue knows that the element doesn't
   * need to be serialized.
   * @deprecated  see PurgatoryElement
   */
  class DiskElement extends CacheElement implements IDiskElement, Serializable {
  
    private boolean isSpoolable = false;
  
    public DiskElement( ICacheElement ice ) {
      this( ice.getCacheName(), ice.getKey(), ice.getVal() );
    }
    public DiskElement( String cacheName, Serializable key, Serializable val) {
      super( cacheName, key, val );
    }
  
    // lets the queue know that is ready to be spooled
    public boolean getIsSpoolable() {
      return isSpoolable;
    }
    public void setIsSpoolable( boolean isSpoolable ) {
      this.isSpoolable = isSpoolable;
    }
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskElementDescriptor.java
  
  Index: DiskElementDescriptor.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk;
  
  import java.io.*;
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.auxiliary.disk.*;
  
  /**
   * Disk objects are located by descriptor entries.  These are saved on shutdown
   * and loaded into memory on startup.
   */
  public class DiskElementDescriptor implements Serializable {
  
    /** Position of the cache data entry on disk. */
    long pos;
    /** Number of bytes the serialized form of the cache data takes. */
    public int len;
  
    public void init(long pos, byte[] data) {
      this.pos = pos;
      this.len = data.length;
      //    this.hash = hashCode(data);
    }
  
    //////////////////////////////////
    public DiskElementDescriptor() {
    }
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/DiskLockManager.java
  
  Index: DiskLockManager.java
  ===================================================================
  
  package org.apache.stratum.jcs.auxiliary.disk;
  
  import org.apache.stratum.jcs.auxiliary.disk.*;
  import org.apache.stratum.jcs.utils.reuse.*;
  
  import java.util.*;
  
  /** Read/Write lock manager for Disk. */
  class DiskLockManager extends ReadWriteLockManager {
  
    /**
     *    @TODO might need to have this lock for only one cache at a time
     *    might want to lock on a Diskcache instance
     */
    public static final String Disk = "Disk";
    private static DiskLockManager instance;
  
    boolean debug = false;
    private final Hashtable ht = new Hashtable();
  
    ///////////////////////////////////////////////
    private DiskLockManager() {
    }
    static DiskLockManager getInstance() {
      if (instance == null) {
        synchronized(DiskLockManager.class) {
          if (instance == null) {
            instance = new DiskLockManager();
          }
        }
      }
      return instance;
    }
  
    ///////////////////////////////////////////
    protected Hashtable getLocks() {
      return ht;
    }
  
    ////////////////////////////////////////////
    void readLock() {
      try {
        readLock(Disk);
      } catch(InterruptedException ex) {
        // should never happen.
        ex.printStackTrace();
        throw new IllegalStateException(ex.getMessage());
      }
    }
  
    ////////////////////////////////////////////
    void writeLock() {
      try {
        writeLock(Disk);
      } catch(InterruptedException ex) {
        // should never happen.
        ex.printStackTrace();
        throw new IllegalStateException(ex.getMessage());
      }
    }
  
    ///////////////////////////////////////////////////
    void done() {
      done(Disk);
    }
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/PurgatoryElement.java
  
  Index: PurgatoryElement.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk;
  
  import java.io.Serializable;
  
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.control.*;
  
  /**
   * Title:
   * Description:  This serves as a quickly createable wraper for CacheElements
   *               in purgatory.
   * Copyright:    Copyright (c) 2001
   * Company:
   * @author Aaron Smuts
   * @version 1.0
   */
  
  public class PurgatoryElement implements ICacheElement, Serializable {
  
    // need speed here.  the method calls are unnecessary.   make protected
    protected boolean isSpoolable = false;
    protected ICacheElement ice;
  
    public PurgatoryElement( ICacheElement ice ) {
      this.ice = ice;
    }
  
   // lets the queue know that is ready to be spooled
    public boolean getIsSpoolable() {
      return isSpoolable;
    }
    public void setIsSpoolable( boolean isSpoolable ) {
      this.isSpoolable = isSpoolable;
    }
  
    // ICacheElement Methods
    public String getCacheName() {
      return ice.getCacheName();
    }
    public Serializable getKey() {
      return ice.getKey();
    }
    public Serializable getVal() {
      return ice.getVal();
    }
    public Attributes getAttributes() {
      return ice.getAttributes();
    }
    public void setAttributes( IAttributes attr ) {
      ice.setAttributes( attr );
    }
    public long getCreateTime() {
      return ice.getCreateTime();
    }
  
  }
  
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to