asmuts      02/01/14 22:18:12

  Added:       src/java/org/apache/stratum/jcs/auxiliary/disk/hsql
                        HSQLCache.java HSQLCacheAttributes.java
                        HSQLCacheFactory.java HSQLCacheManager.java
                        HSQLCacheNoWaitBuffer.java PurgatoryElement.java
  Log:
  test implementation of an hsql disk cache auxiliary
  too slow in current form, future project
  extend to other databases as a db auxiliary
  
  Revision  Changes    Path
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/hsql/HSQLCache.java
  
  Index: HSQLCache.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk.hsql;
  
  import  java.io.*;
  import  java.util.*;
  import  java.sql.*;
  
  import org.apache.stratum.jcs.access.*;
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.auxiliary.disk.hsql.*;
  import org.apache.stratum.jcs.auxiliary.disk.hsql.behavior.*;
  import org.apache.stratum.jcs.engine.control.*;
  import org.apache.stratum.jcs.engine.control.Cache;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.engine.*;
  
  import org.apache.stratum.jcs.utils.data.*;
  import org.apache.stratum.jcs.utils.log.*;
  
  import org.hsqldb.*;
  
  
  ////////////////////////////////////////////////////////////////////////
  /**
   *  @author Aaron Smuts
   *  @version 1.0
   *
   */
  public class HSQLCache implements ICache, Serializable {
  
    private static  Logger log;
    private static int numCreated = 0;
    private int numInstances = 0;
  
    // trivial debugging that should be removed by the compiler
    private static final boolean debug =  false;//true;
    private static final boolean debugR =  false;//true;
    private static final boolean debugPut = false;//true;
    private static final boolean debugGet = false;//true;
  
    private String cacheName;
  
    public boolean isAlive = false;
  
     // not used
    private String source_id = "org.apache.stratum.jcs.auxiliary.hsql.HSQLCache";
  
    IHSQLCacheAttributes cattr;
  
    // disk cache buffer, need to remove key from buffer on update, if its there
    HSQLCacheNoWaitBuffer buffer;
  
    // for now use one statement per cache and keep it open
    // can move up to manager level or implement pooling if there are too many
    // caches
    Connection cConn;
    Statement  sStatement;
    //PreparedStatement psInsert;
    //PreparedStatement psUpdate;
    //PreparedStatement psSelect;
  
  
    ///////////////////////////////////////////////////
    public Serializable getSourceId() {
      return this.source_id;
    }
  
    ////////////////////////////////////////////////////////////////////
    // should use this method
    public HSQLCache ( HSQLCacheNoWaitBuffer buffer, IHSQLCacheAttributes cattr ) {
      this( cattr.getCacheName(), cattr.getDiskPath() );
      this.cattr = cattr;
      this.buffer = buffer;
    }
    protected HSQLCache ( String cacheName ) {
      this( cacheName, null);
    }
    //////////////////////////////////////////////////////////////////////
    protected HSQLCache ( String cacheName, String rafroot ) {
      numInstances++;
  
      log = LoggerManager.getLogger( this );
  
      this.cacheName = cacheName;
  
      //String rafroot = cattr.getDiskPath();
      if ( rafroot == null ) {
        try {
          PropertyGroups pg = new PropertyGroups( "/cache.properties" );
          rafroot = pg.getProperty( "diskPath" );
        } catch( Exception e ) {
          log.error( e );
        }
      }
  
      try {
  
        Properties p = new Properties();
        String  driver = p.getProperty("driver", "org.hsqldb.jdbcDriver");
        String  url = p.getProperty("url", "jdbc:hsqldb:");
        String  database = p.getProperty("database", "cache_hsql_db");
        String  user = p.getProperty("user", "sa");
        String  password = p.getProperty("password", "");
        boolean test = p.getProperty("test", "true").equalsIgnoreCase("true");
        boolean log = p.getProperty("log", "true").equalsIgnoreCase("true");
  
        try {
          if (log) {
            trace("driver  =" + driver);
            trace("url     =" + url);
            trace("database=" + database);
            trace("user    =" + user);
            trace("password=" + password);
            trace("test    =" + test);
            trace("log     =" + log);
            //DriverManager.setLogStream(System.out);
          }
  
           // As described in the JDBC FAQ:
          // http://java.sun.com/products/jdbc/jdbc-frequent.html;
          // Why doesn't calling class.forName() load my JDBC driver?
          // There is a bug in the JDK 1.1.x that can cause Class.forName() to fail.
          new org.hsqldb.jdbcDriver();
          Class.forName(driver).newInstance();
  
          cConn = DriverManager.getConnection(url + database, user,
                password);
  
          try {
              sStatement = cConn.createStatement();
              isAlive = true;
          } catch (SQLException e) {
              System.out.println("Exception: " + e);
              isAlive = false;
          }
  
          setupTABLE();
  
        } catch (Exception e) {
            p("QueryTool.init: " + e.getMessage());
            e.printStackTrace();
        }
  
      } catch (Exception e) {
        log.logEx(e);
      }
  
    }             // end constructor
  
    /**
     * SETUP TABLE FOR CACHE
     *
     */
    void setupTABLE() {
  
      boolean newT = true;
  
      String setup = "create table " + cacheName + " (KEY varchar(255) primary key, 
ELEMENT binary)";
  
      try {
        sStatement.executeQuery(setup);
      } catch (SQLException e) {
        if ( e.toString().indexOf( "already exists" ) != -1 ) {
          newT = false;
        }
        System.out.println("Exception: " + e);
      }
  
      String setupData[] = {
        "create index iKEY on " + cacheName + " (KEY)"
      };
  
  
      if ( newT ) {
        for (int i = 1; i < setupData.length; i++) {
          try {
            sStatement.executeQuery(setupData[i]);
          } catch (SQLException e) {
            System.out.println("Exception: " + e);
          }
        }
      } // end ifnew
  
    }
  
  
  
  
    ////////////////////////////////////////////////////////
    public void add (Serializable key, Serializable value)   throws IOException {
      put(key, value);
    }
  
    // ignore the multicast field.
    public void put (Serializable key, Serializable value, boolean multicast)  throws 
IOException {
      put(key, value);
    }
  
    public void put (Serializable key, Serializable value)  throws IOException {
      put( key, value, null );
    }
    public void put (Serializable key, Serializable value, Attributes attr)  throws 
IOException {
      CacheElement ce = null;
      ce = new CacheElement( cacheName, key, value );
      ce.setAttributes( attr );
      update( ce );
    }
    ///////////////////////////////////////////////////////////
    public void update( ICacheElement ce ) throws IOException {
  
      if ( debug ) {
        p( "update" );
      }
  
      if (!isAlive) {
        return;
      }
  
      if ( ce instanceof PurgatoryElement ) {
        PurgatoryElement pe = (PurgatoryElement)ce;
        ce = pe.ice;
        if ( !pe.isSpoolable ) {
          if ( debug ) {
            p( "pe is not spoolable" );
          }
          // it has been plucked from purgatory
          return;
        }
      }
  //    /*
  //    if ( ce instanceof IDiskElement ) {
  //      IDiskElement ide = (IDiskElement)ce;
  //      if ( !ide.getIsSpoolable() ) {
  //        // it has been plucked from purgatory
  //        return;
  //      }
  //    }
  //    */
      // remove item from purgatory since we are putting it on disk
      // assume that the disk cache will never break
      // disk breakage is akin to an out of memory exception
      buffer.purgatory.remove( ce.getKey() );
      if ( debug ) {
        p( "putting " + ce.getKey() + " on disk, removing from purgatory" );
      }
  
      if ( debug ) {
        p( "putting " + ce );
      }
  
      // remove single item.
      byte[] element = serialize( ce );
      //String sql = "insert into " + cacheName + " (KEY, ELEMENT) values ('" + 
ce.getKey() + "', '" + element + "' )";
  
      boolean exists = false;
  
      try {
        //String sqlS = "select ELEMENT from " + cacheName + " where KEY = ?";
        //PreparedStatement psSelect = cConn.prepareStatement(sqlS);
        //psSelect.setString(1,(String)ce.getKey());
        //ResultSet rs =  psSelect.executeQuery();
  
        String sqlS = "select ELEMENT from " + cacheName + " where KEY = '" + 
(String)ce.getKey() + "'" ;
        ResultSet rs =  sStatement.executeQuery(sqlS);
  
        if ( rs.next() ) {
          exists = true;
        }
        rs.close();
        //psSelect.close();
      } catch (SQLException e) {
        log.error(e);
      }
  
      System.out.println( "exists = " + exists );
  
      if ( !exists ) {
  
        try {
          String sqlI = "insert into " + cacheName + " (KEY, ELEMENT) values (?, ? )";
          PreparedStatement psInsert = cConn.prepareStatement(sqlI);
          psInsert.setString(1,(String)ce.getKey());
          psInsert.setBytes(2,element);
          psInsert.execute();
          psInsert.close();
          //sStatement.executeUpdate(sql);
        } catch (SQLException e) {
          if ( e.toString().indexOf( "Violation of unique index" ) != -1 || 
e.getMessage().indexOf( "Violation of unique index" ) != -1 ) {
            exists = true;
          } else {
            log.error("Exception: " + e);
          }
        }
  
      } else {
  
          //sql = "update " + cacheName + " set ELEMENT = '" + element + "' where KEY 
= '" + ce.getKey() + "'";
          try {
            String sqlU = "update " + cacheName + " set ELEMENT  = ? ";
            PreparedStatement psUpdate = cConn.prepareStatement(sqlU);
            psUpdate.setBytes(1,element);
            psUpdate.setString(2,(String)ce.getKey());
            psUpdate.execute();
            psUpdate.close();
            //sStatement.executeUpdate(sql);
            System.out.println("ran update" );
          } catch (SQLException e2) {
            log.error("e2 Exception: " + e2);
          }
  
      }
  
  
  
  //
  //    DiskElementDescriptor ded = null;
  //    try {
  //
  //      ded = new DiskElementDescriptor();
  //      byte[] data = Disk.serialize( ce );
  //      ded.init( dataFile.length(), data );
  //
  //      // make sure this only locks for one particular cache region
  //      locker.writeLock();
  //
  //      try {
  //        if (!isAlive) {
  //          return;
  //        }
  //        // Presume it's an append.
  //        //DiskElement re = new DiskElement( cacheName, key, value );
  //        //re.init( dataFile.length(), data );
  //
  //        DiskElementDescriptor old = 
(DiskElementDescriptor)keyHash.put(ce.getKey(), ded );
  //
  //        // Item with the same key already exists in file.
  //        // Try to reuse the location if possible.
  //        if (old != null && ded.len <= old.len) {
  //          ded.pos = old.pos;
  //        }
  //        dataFile.write(data, ded.pos);
  //        /*
  //         // Only need to write if the item with the same key and value
  //         // does not already exist in the file.
  //         if (re.equals(old)) {
  //         re.pos = old.pos;
  //         }
  //         else {
  //         // Item with the same key but different value already exists in file.
  //         // Try to reuse the location if possible.
  //         if (old != null && re.len <= old.len) {
  //         re.pos = old.pos;
  //         }
  //         dataFile.write(data, re.pos);
  //         }
  //         */
  //
  //      } finally {
  //        locker.done();          // release write lock.
  //      }
  //      if ( log.logLevel >= log.DEBUG ) {
  //        log.debug(fileName + " -- put " + ce.getKey() + " on disk at pos " + 
ded.pos + " size = " + ded.len );
  //      }
  //    } catch( ConcurrentModificationException cme ) {
  //      // do nothing, this means it has gone back to memory mid serialization
  //    } catch (Exception e) {
  //      log.logEx(e, "cacheName = " + cacheName + ", ce.getKey() = " + ce.getKey());
  //      //reset();
  //    }
      return;
    }
  
    ///////////////////////////////////////////////////////////////
    public Serializable get (Serializable key) {
      return  get(key, true, true);
    }
  
    ///////////////////////////////////////////////////////////////
    public Serializable get (Serializable key, boolean container ) {
      return  get(key, true, true);
    }
  
    //////////////////////////////////////////////////////////////////////
    private Serializable get (Serializable key, boolean container, final boolean lock) 
{
  
      if ( debugGet ) {
        p( "getting " + key + " from disk" );
      }
  
      if (!isAlive) {
        return  null;
      }
  
      Serializable obj = null;
  
      byte[] data = null;
      try {
        String sqlS = "select ELEMENT from " + cacheName + " where KEY = ?";
        PreparedStatement psSelect = cConn.prepareStatement(sqlS);
        psSelect.setString(1,(String)key);
        ResultSet rs =  psSelect.executeQuery();
        if ( rs.next() ) {
          data = rs.getBytes(1);
        }
        if ( data != null ) {
          try {
            ByteArrayInputStream bais = new ByteArrayInputStream(data);
            BufferedInputStream bis = new BufferedInputStream( bais );
            ObjectInputStream ois = new ObjectInputStream(bis);
          try {
            obj = (Serializable)ois.readObject();
          } finally {
            ois.close();
          }
          // move to finally
          rs.close();
          psSelect.close();
        } catch (IOException ioe ) {
          log.error( ioe );
        } catch (Exception e ) {
          log.error( e );
        }
  
  
        } //else {
          //return null;
        //}
  
      } catch (SQLException sqle ) {
        log.error( sqle );
      }
  
  
  //    if (lock) {
  //      locker.readLock();
  //    }
  //    try {
  //      if (!isAlive) {
  //        return  null;
  //      }
  //      DiskElementDescriptor ded = (DiskElementDescriptor)keyHash.get(key);
  //      if (ded != null) {
  //        if ( debugGet ) {
  //          p( "found " + key + " on disk" );
  //        }
  //        obj = dataFile.readObject(ded.pos);
  //      }
  //      //System.out.println( "got element = " + (CacheElement)obj);
  //    } catch (NullPointerException e) {
  //      log.logEx(e, "cacheName = " + cacheName + ", key = " + key);
  //    } catch (Exception e) {
  //      log.logEx(e, "cacheName = " + cacheName + ", key = " + key);
  //    } finally {
  //      if (lock) {
  //        locker.done();
  //      }
  //    }
  
      return  obj;
    }
  
    /**
     * Returns true if the removal was succesful; or false if there is nothing to 
remove.
     * Current implementation always result in a disk orphan.
     */
    public boolean remove (Serializable key) {
  
      // remove single item.
      String sql = "delete from " + cacheName + " where KEY = '" + key + "'";
  
      try {
  
        if (key instanceof String && 
key.toString().endsWith(NAME_COMPONENT_DELIMITER)) {
          // remove all keys of the same name group.
          sql = "delete from " + cacheName + " where KEY = like '" + key + "%'";
        }
  
        try {
          sStatement.executeQuery(sql);
        } catch (SQLException e) {
          System.out.println("Exception: " + e);
        }
  
      } catch (Exception e) {
        log.logEx(e);
        reset();
      }
      return  false;
    }
  
    /////////////////////////////////////////////
    public void removeAll () {
      try {
        //reset();
      } catch (Exception e) {
        log.logEx(e);
        //reset();
      } finally {}
    }             // end removeAll
  
    ////////////////////////////////////////////////////////////////
    // handle error by last resort, force content update, or removeall
    public void reset () {
  //    log.logIt("Reseting cache");
  //    locker.writeLock();
  //    try {
  //      try {
  //        dataFile.close();
  //        File file = new File(rafDir, fileName + ".data");
  //        file.delete();
  //        keyFile.close();
  //        File file2 = new File(rafDir, fileName + ".key");
  //        file2.delete();
  //      } catch (Exception e) {
  //        log.logEx(e);
  //      }
  //      try {
  //        dataFile = new Disk(new File(rafDir, fileName + ".data"));
  //        keyFile = new Disk(new File(rafDir, fileName + ".key"));
  //        keyHash = new HashMap();
  //      } catch (IOException e) {
  //        log.logEx(e, " -- IN RESET");
  //      } catch (Exception e) {
  //        log.logEx(e, " -- IN RESET");
  //      }
  //    } finally {
  //      locker.done();            // release write lock.
  //    }
    }             // end reset
  
    ////////////////////////////////////////////////////////////////////////////
    public String getStats () {
      return   "numInstances = " + numInstances;
    }
  
    /////////////////////////////////////////////////////////////
    // shold be called by cachemanager, since it knows how
    // many are checked out
    public void dispose () {
  //    if (!isAlive) {
  //      log.logIt("is not alive and close() was called -- " + fileName);
  //      return;
  //    }
  //    locker.writeLock();
  //    try {
  //      if (!isAlive) {
  //        log.logIt("is not alive and close() was called -- " + fileName);
  //        return;
  //      }
  //      try {
  //        optimizeFile();
  //      } catch (Exception e) {
  //        log.logEx(e, "-- " + fileName);
  //      }
  //      try {
  //        numInstances--;
  //        if (numInstances == 0) {
  //          p( "dispose -- Closing files -- in close -- " + fileName );
  //          log.warn("dispose -- Closing files -- in close -- " + fileName);
  //          dataFile.close();
  //          dataFile = null;
  //          keyFile.close();
  //          keyFile = null;
  //        }
  //      } catch (Exception e) {
  //        log.logEx(e, "-- " + fileName);
  //      }
  //    } finally {
  //      isAlive = false;
  //      locker.done();            // release write lock;
  //    }
    }             // end dispose
  
  
  
    /** Returns the cache status. */
    public int getStatus () {
      return  isAlive ? STATUS_ALIVE : STATUS_DISPOSED;
    }
  
    /** Returns the current cache size. */
    public int getSize () {
      return  0; // need to get count
    }
  
    public int getCacheType () {
      return  DISK_CACHE;
    }
  
    /** For debugging. */
    public void dump () {
  //    log.debug("keyHash.size()=" + keyHash.size());
  //    for (Iterator itr = keyHash.entrySet().iterator(); itr.hasNext();) {
  //      Map.Entry e = (Map.Entry)itr.next();
  //      Serializable key = (Serializable)e.getKey();
  //      DiskElementDescriptor ded = (DiskElementDescriptor)e.getValue();
  //      Serializable val = get(key);
  //      log.debug("disk dump> key=" + key + ", val=" + val + ", pos=" + ded.pos);
  //    }
    }
  
  
    //////////////////////////////////////////
      void trace(String s) {
        p(s);
      }
    //////////////
    public void p( String s ) {
      log.debug(s);
      //System.out.println( "HSQLCache: " + s );
    }
  
  
    /** Returns cache name, ha */
    public String getCacheName () {
      return  cacheName;
    }
  
    /////////////////////////////////////////////////////////////////////////
    /** Returns the serialized form of the given object in a byte array. */
    static byte[] serialize (Serializable obj) throws IOException {
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      ObjectOutputStream oos = new ObjectOutputStream(baos);
      try {
        oos.writeObject(obj);
      } finally {
        oos.close();
      }
      return  baos.toByteArray();
    }
  
  }               // end class
  
  
  
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/hsql/HSQLCacheAttributes.java
  
  Index: HSQLCacheAttributes.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk.hsql;
  
  import org.apache.stratum.jcs.auxiliary.disk.hsql.behavior.IHSQLCacheAttributes;
  import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheAttributes;
  
  
  //////////////////////////////////////////////////
  public class HSQLCacheAttributes implements IHSQLCacheAttributes {
  
    private String cacheName;
    private String name;
  
    private String diskPath;
  
    /////////////////////////////////////////
    public HSQLCacheAttributes() {
    }
  
    ////////////////////////////////////
    public void setDiskPath( String path ) {
      this.diskPath = path;
    }
  
    ////////////////////////////////////
    public String getDiskPath( ) {
      return this.diskPath;
    }
  
    ////////////////////////////////////////////////////
    public void setCacheName( String s ) {
      this.cacheName = s;
    }
    public String getCacheName( ) {
      return this.cacheName;
    }
  
    /////////////////////////////////////////////////////////////////////
    public String getName() {
      return this.name;
    }
    public void setName( String name ) {
      this.name = name;
    }
  
    /////////////////////////////////////////////////
    public IAuxiliaryCacheAttributes copy() {
      try {
        return (IAuxiliaryCacheAttributes)this.clone();
      } catch( Exception e ){}
      return (IAuxiliaryCacheAttributes)this;
    }
  
    ///////////////////////////////////////////////////////////////
    public String toString() {
      StringBuffer str = new StringBuffer();
      str.append( "diskPath = " + diskPath );
      return str.toString();
    }
  
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/hsql/HSQLCacheFactory.java
  
  Index: HSQLCacheFactory.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk.hsql;
  
  import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheFactory;
  import org.apache.stratum.jcs.engine.behavior.ICache;
  import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheAttributes;
  
  /**
   * @author Aaron Smuts
   * @version 1.0
   */
  
  import org.apache.stratum.jcs.auxiliary.behavior.*;
  import org.apache.stratum.jcs.auxiliary.disk.hsql.behavior.*;
  import org.apache.stratum.jcs.utils.log.*;
  
  ///////////////////////////////////////////////////////////////
  public class HSQLCacheFactory implements IAuxiliaryCacheFactory {
  
    private static Logger log = LoggerManager.getLogger( HSQLCacheFactory.class );
  
    private static String name;
  
    ///////////////////////////////////////////////
    public HSQLCacheFactory() {
    }
  
    ////////////////////////////////////////////////////////////////////
    public ICache createCache(IAuxiliaryCacheAttributes iaca) {
      IHSQLCacheAttributes idca = (IHSQLCacheAttributes)iaca;
      HSQLCacheManager dcm = HSQLCacheManager.getInstance( idca );
      return dcm.getCache( idca );
    }
  
    /////////////////////////////////////////////////////////////////////
    public String getName() {
      return this.name;
    }
    public void setName( String name ) {
      this.name = name;
    }
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/hsql/HSQLCacheManager.java
  
  Index: HSQLCacheManager.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk.hsql;
  
  import org.apache.stratum.jcs.auxiliary.disk.*;
  import java.io.*;
  import java.util.*;
  import java.sql.*;
  
  import org.apache.stratum.jcs.auxiliary.disk.hsql.*;
  import org.apache.stratum.jcs.auxiliary.disk.hsql.behavior.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.utils.log.*;
  
  /////////////////////////////////////////////
  public class HSQLCacheManager implements ICacheManager {
  
    private static int clients;
  
    private static Hashtable caches = new Hashtable();
  
    private static Logger log;
    private static final boolean debug = false; //true;
    private static final boolean pOut = false; //true;
  
    private static HSQLCacheManager instance;
  
    private static IHSQLCacheAttributes defaultCattr;
  
    ///////////////////////////////////////////////////////////////////
     private HSQLCacheManager( IHSQLCacheAttributes cattr ) {
      log = LoggerManager.getLogger( this );
      this.defaultCattr = cattr;
    }
  
    ////////////////////////////////////////
    public IHSQLCacheAttributes getDefaultCattr() {
      return this.defaultCattr;
    }
  
    ///////////////////////////////////////////////////////////////////
     public static HSQLCacheManager getInstance( IHSQLCacheAttributes cattr ) {
        if (instance == null) {
          synchronized(HSQLCacheManager.class) {
            if (instance == null) {
              instance = new HSQLCacheManager( cattr );
            }
          }
        }
        if ( debug ) {
          log.logIt( "Manager stats : " + instance.getStats() + "<br> -- in 
getInstance()" );
        }
        clients++;
        return instance;
     }
  
     /////////////////////////////////////////////////////////
    public ICache getCache( String cacheName ) {
      IHSQLCacheAttributes cattr = (IHSQLCacheAttributes)defaultCattr.copy();
      cattr.setCacheName( cacheName );
      return getCache( cattr );
    }
  
    //////////////////////////////////////////////////////////
    public ICache getCache( IHSQLCacheAttributes cattr ) {
        ICache raf=null;
  
        p( "cacheName = " + cattr.getCacheName() );
  
        synchronized(caches) {
          raf = (ICache)caches.get( cattr.getCacheName() );
  
          if (raf == null) {
            // make use cattr
            //raf = new HSQLCache( cattr.getCacheName(), cattr.getDiskPath() );
            raf = new HSQLCacheNoWaitBuffer( cattr );
            caches.put( cattr.getCacheName(), raf );
          }
        }
        if ( debug ) {
          log.logIt( "Manager stats : " + instance.getStats() );
        }
        return raf;
     }
  
  
  
    ////////////////////////////////////////////////////////////////
     public void freeCache( String name ) {
        HSQLCache raf = (HSQLCache)caches.get(name);
        if (raf != null) {
           raf.dispose();
        }
     }
  
    // Don't care if there is a concurrency failure ?
    public String getStats(){
      StringBuffer stats = new StringBuffer();
      Enumeration allCaches = caches.elements();
  
      while (allCaches.hasMoreElements()) {
        HSQLCache raf = (HSQLCache)allCaches.nextElement();
        if (raf != null) {
         stats.append( "<br>&nbsp;&nbsp;&nbsp;" + raf.getStats() );
        }
      }
      return stats.toString();
    }
  
    ///////////////////////////////////////
    public int getCacheType() {
      return DISK_CACHE;
    }
  
  
    /////////////////////////////////////////////////////////////////
    public void release() {
      // Wait until called by the last client
      if (--clients != 0) {
        return;
      }
      synchronized(caches) {
        Enumeration allCaches = caches.elements();
  
        while (allCaches.hasMoreElements()) {
          HSQLCache raf = (HSQLCache)allCaches.nextElement();
          if (raf != null) {
            raf.dispose();
          }
        }
      }
    } //end release()
  
  
    /////////////////////////////////////////////
    public void p( String s ) {
      if ( log.logLevel >= log.DEBUG ) {
        log.debug( s );
      }
      if ( pOut ) {
        System.out.println( "HSQLCacheManager: " + s );
      }
    }
  
  
  } // end class
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/hsql/HSQLCacheNoWaitBuffer.java
  
  Index: HSQLCacheNoWaitBuffer.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk.hsql;
  
  import java.io.*;
  import java.rmi.*;
  import java.util.*;
  
  import org.apache.stratum.jcs.engine.control.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.disk.hsql.behavior.*;
  import org.apache.stratum.jcs.utils.log.*;
  
  
  /**
   * Store recent arivals in a temporary queue.
   * Used to queue up update requests to the underlying cache.
   * These requests will be processed in their order of arrival
   * via the cache event queue processor.
   *
   *
   *  Note:  There is a a problem lurking in all queued distributed
   *  systems, where repopulzation of memory can occur dispite a removal
   *  if the tow arrive in different order than issued.  The possible solution
   *  to never delete, but only invlidate.  The cache can act ina cvs like mode,
   *  enforcing versions of element, checking to see if a newer version exists
   *  adding.  So a remote put could amke sure a new version hasn't arrived.
   *  Invalidaions would have version.  We could avoid spooling these since it will
   *  have been a while.  This will be called garuanteed mode and can be added
   *  to any listener.
   *
   *
   */
  public class HSQLCacheNoWaitBuffer implements ICache {
  
    private static final boolean debug =  false;//true;
  
    // accessed by disk cache
    protected Hashtable purgatory = new Hashtable();
    private static final boolean debugPH =  true;
    private int purgHits = 0;
  
    private IHSQLCacheAttributes cattr;
    private HSQLCache cache;
    private ICacheEventQueue q;
    private transient Logger log = LoggerManager.getInstance().getLogger(this);
  
    private String source_id = 
"org.apache.stratum.jcs.auxiliary.disk.HSQLCacheNoWaitBuffer";
  
  
  
    ///////////////////////////////////////////////////
    public Serializable getSourceId() {
      return this.source_id;
    }
  
    /**
     * Constructs with the given disk cache,
     * and fires up an event queue for aysnchronous processing.
     */
    public HSQLCacheNoWaitBuffer(IHSQLCacheAttributes cattr) {
        cache = new HSQLCache( this, cattr );
        this.cattr = cattr;
        this.q = new CacheEventQueue(new CacheAdaptor(cache), CacheInfo.listenerId, 
cache.getCacheName());
  
        // need each no wait to handle each of its real updates and removes, since 
there may
        // be more than one per cache?  alternativve is to have the cache
        // perform updates using a different method that spcifies the listener
        //this.q = new CacheEventQueue(new CacheAdaptor(this), 
HSQLCacheInfo.listenerId, cache.getCacheName());
        if (cache.getStatus() == cache.STATUS_ERROR) {
          log.error( "destroying queue" );
          q.destroy();
        }
    }
  
    /** Adds a put request to the disk cache. */
    public void put(Serializable key, Serializable value ) throws IOException {
      put( key, value, null );
    }
    public void put(Serializable key, Serializable value,  Attributes attr ) throws 
IOException {
      try {
        CacheElement ce = new CacheElement( cache.getCacheName(), key, value );
        ce.setAttributes( attr );
        update( ce );
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
    }
    public void update( ICacheElement ce ) throws IOException {
      try {
        if ( debug ) {
          p( "putting in purgatory" );
        }
  
  
        PurgatoryElement pe = new PurgatoryElement( ce );
        pe.isSpoolable = true;
        q.addPutEvent( (ICacheElement)pe );
  
        //q.addPutEvent( ce );
  
        /*
        // may be too slow
        IDiskElement ide = new DiskElement( ce );
        ide.setAttributes( ce.getAttributes() );
        purgatory.put( ide.getKey(), ide );
        //CacheElement ice = new CacheElement(ce.getCacheName(), ce.getKey(), 
ce.getVal() );
        //ice.setAttributes( ce.getAttributes() );
        ide.setIsSpoolable( true );
        q.addPutEvent( ide );
        */
  
      } catch(IOException ex) {
        log.error(ex);
        // should we destroy purgatory.  it will swell
        q.destroy();
      }
    }
  
    /** Synchronously reads from the disk cache. */
    public Serializable get(Serializable key) {
      return get( key, true );
    }
    public Serializable get(Serializable key, boolean container ) {
      //IDiskElement ide = (IDiskElement)purgatory.get( key );
      PurgatoryElement pe = (PurgatoryElement)purgatory.get(key);
      //if ( ide != null ) {
      if ( pe != null ) {
        purgHits++;
        if( debugPH ) {
          if ( purgHits % 100 == 0 ) {
            p( "purgatory hits = " + purgHits );
          }
        }
  
        //ide.setIsSpoolable( false );
        pe.isSpoolable = false;
        if ( debug ) {
          p( "found in purgatory" );
        }
        if ( container ) {
          purgatory.remove( key );
          //return (ICacheElement)ide;
          return pe.ice;
        } else {
          purgatory.remove( key );
          //return (Serializable)ide.getVal();
          return (Serializable)pe.ice.getVal();
        }
      }
      try {
        return cache.get(key);
      } catch(Exception ex) {
        q.destroy();
        // not sure we should do this.  What about purgatory?
        // must assume that we will not loose disk access
        // can make a repairer, but it complicates purgatory.
      }
      return null;
    }
  
  
    /** Adds a remove request to the disk cache. */
    public boolean remove(Serializable key) {
      purgatory.remove( key );
      try {
        q.addRemoveEvent(key);
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
      return false;
    }
    /** Adds a removeAll request to the disk cache. */
    public void removeAll() {
  
      Hashtable temp = purgatory;
      purgatory = new Hashtable();
      temp = null;
  
      try {
        q.addRemoveAllEvent();
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
    }
    /** Adds a dispose request to the disk cache. */
    public void dispose() {
      cache.dispose();
      // may loose the end of the queue, need to be more graceful
      q.destroy();
      /*
      try {
        q.addDisposeEvent();
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
      */
  
    }
    /** No disk invokation. */
    public String getStats() {
      return cache.getStats();
    }
    /** No disk invokation. */
    public int getSize() {
      return cache.getSize();
    }
    /** No disk invokation. */
    public int getCacheType() {
      return cache.getCacheType();
    }
    /**
     * Returns the asyn cache status.
     * An error status indicates either the disk connection is not available,
     * or the asyn queue has been unexpectedly destroyed.
     * No disk invokation.
     */
    public int getStatus() {
      return q.isAlive() ? cache.getStatus() : cache.STATUS_ERROR;
    }
    public String getCacheName() {
      return cache.getCacheName();
    }
  
    /** NOT USED NOW
     * Replaces the disk cache service handle with the given handle and
     * reset the event queue by starting up a new instance.
     */
    public void fixCache(IHSQLCacheService disk) {
      //cache.fixCache(disk);
      resetEventQ();
      return;
    }
    /** Resets the event q by first destroying the existing one and starting up new 
one. */
    public void resetEventQ() {
      if (q.isAlive()) {
        q.destroy();
      }
      this.q = new CacheEventQueue(new CacheAdaptor(cache), CacheInfo.listenerId, 
cache.getCacheName());
    }
    public String toString() {
      return "HSQLCacheNoWaitBuffer: " + cache.toString();
    }
    private void p(String s) {
      System.out.println("HSQLCacheNoWaitBuffer:" + s);
    }
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/disk/hsql/PurgatoryElement.java
  
  Index: PurgatoryElement.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.disk.hsql;
  
  import java.io.Serializable;
  
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.control.*;
  
  ///////////////////////////////////////////////////////////////
  public class PurgatoryElement implements ICacheElement, Serializable {
  
    // need speed here.  the method calls are unnecessary.   make protected
    protected boolean isSpoolable = false;
    protected ICacheElement ice;
  
    public PurgatoryElement( ICacheElement ice ) {
      this.ice = ice;
    }
  
   // lets the queue know that is ready to be spooled
    public boolean getIsSpoolable() {
      return isSpoolable;
    }
    public void setIsSpoolable( boolean isSpoolable ) {
      this.isSpoolable = isSpoolable;
    }
  
    // ICacheElement Methods
    public String getCacheName() {
      return ice.getCacheName();
    }
    public Serializable getKey() {
      return ice.getKey();
    }
    public Serializable getVal() {
      return ice.getVal();
    }
    public Attributes getAttributes() {
      return ice.getAttributes();
    }
    public void setAttributes( IAttributes attr ) {
      ice.setAttributes( attr );
    }
    public long getCreateTime() {
      return ice.getCreateTime();
    }
  
  }
  
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to