asmuts      02/01/14 22:50:44

  Added:       src/java/org/apache/stratum/jcs/auxiliary/lateral
                        LateralCache.java LateralCacheAttributes.java
                        LateralCacheFactory.java LateralCacheInfo.java
                        LateralCacheManager.java LateralCacheMonitor.java
                        LateralCacheNoWait.java
                        LateralCacheNoWaitFacade.java
                        LateralCacheRestore.java
                        LateralCacheWatchRepairable.java
                        LateralElementDescriptor.java
                        ZombieLateralCacheService.java
                        ZombieLateralCacheWatch.java
  Log:
  no message
  
  Revision  Changes    Path
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCache.java
  
  Index: LateralCache.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.lateral;
  
  import java.io.*;
  import java.util.*;
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.access.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
  import org.apache.stratum.jcs.auxiliary.lateral.http.broadcast.*;
  import org.apache.stratum.jcs.auxiliary.lateral.http.remove.*;
  import org.apache.stratum.jcs.utils.log.*;
  import org.apache.stratum.jcs.utils.reuse.*;
  
  
  /**
   * Lateral distributor.  Returns null on get.  Net search not implemented.
   *
   */
  public class LateralCache implements ICache {
  
    private static boolean debug = false; //true;
    private static Logger log;
    private static int numCreated = 0;
  
    Attributes attr = null;
  
    private HashMap keyHash;  // not synchronized to maximize concurrency.
  
    // generalize this, use another interface
    ILateralCacheAttributes cattr;
  
    final String cacheName;
  
    private String source_id = "org.apache.stratum.jcs.auxiliary.lateral.LateralCache";
  
    /**
     * either http, socket.udp, or socket.tcp
     * can set in config
     */
    private ILateralCacheService lateral;
  
    ///////////////////////////////////////////////////
    public Serializable getSourceId() {
      return this.source_id;
    }
  
    ////////////////////////////////////////////////////////
    protected LateralCache(String cacheName, ILateralCacheService lateral) {
      this.cacheName= cacheName;
      this.lateral = lateral;
      log = LoggerManager.getLogger( this );
  
      if (debug) {
        p("Construct> cacheName=" + cacheName + ", lateral =  " + lateral);
      }
    }
  
  
    ////////////////////////////////////////////////////////
    protected LateralCache( ILateralCacheAttributes cattr ) {
  
      this.cacheName= cattr.getCacheName();
      //this.servers = servers;
  
      this.cattr = cattr;
      log = LoggerManager.getLogger( this );
  
      if (debug) {
        p("Construct> cacheName=" + cattr.getCacheName() );
      }
    }
  
  
    ////////////////////////////////////////
    public String toString() {
      return "LateralCache: " + cattr.getCacheName();
    }
  
    /**
     * Synchronously put to the lateral cache;  if failed, replace the remote
     * handle with a zombie.
     */
    public void put(Serializable key, Serializable value ) throws IOException {
      put( key, value, (Attributes)this.attr.copy() );
    }
    public void put(Serializable key, Serializable value, Attributes attr ) throws 
IOException {
      try {
        CacheElement ce = new CacheElement( cattr.getCacheName(), key, value);
        ce.setAttributes( attr );
        update( ce );
      } catch(Exception ex) {
        handleException(ex, "Failed to put " + key + " to " +cattr.getCacheName());
      }
    }
    public void update( ICacheElement ce ) throws IOException {
      try {
        if ( debug ) {
          p( "lateral = " + lateral );
          //p( "ce = " + ce );
          p( "LateralCacheInfo.listenerId = " + LateralCacheInfo.listenerId );
        }
        lateral.update( ce, LateralCacheInfo.listenerId );
      } catch ( NullPointerException npe ) {
        //log.error( npe, "npe for ce = " + ce + "ce.attr = " + ce.getAttributes() );
        log.error( npe );
        return;
      } catch(Exception ex) {
        handleException(ex, "Failed to put " + ce.getKey() + " to " + 
ce.getCacheName());
      }
    } // end update
  
  
  
    /**
     * Return null.  The performace costs are too great.  Can implement later.
     */
    public Serializable get(Serializable key) throws IOException {
      return null;
      //try {
      //} catch ( Exception e ) {
      //  log.debug( "didn't find element " + key + "" );
      //  return null;
      //}
    }
  
  
    /**
     *
     */
    public Serializable get(Serializable key, boolean container) throws IOException {
      try {
        return null;
      } catch(Exception ex) {
        handleException(ex, "Failed to get " + key + " from " +cattr.getCacheName());
        return null; // never executes; just keep the compiler happy.
      }
    }
  
    /**
     * Synchronously remove from the remote cache;  if failed, replace the remote
     * handle with a zombie.
     */
    public boolean remove(Serializable key) throws IOException {
      if (debug) {
        p("remove> key="+key);
      }
      try {
        //DeleteLateralCacheMulticaster dlcm = new DeleteLateralCacheMulticaster( 
cattr.getCacheName(), (String)key, cattr.getLateralCacheAddrs(), 
cattr.getLateralDeleteServlet()  );
        //dlcm.multicast();
        lateral.remove( cacheName, key, LateralCacheInfo.listenerId );
      } catch(Exception ex) {
        handleException(ex, "Failed to remove " + key + " from " + 
cattr.getCacheName());
      }
      return false;
    }
    /**
     * Synchronously removeAll from the remote cache;  if failed, replace the remote
     * handle with a zombie.
     */
    public void removeAll() throws IOException {
      try {
        //DeleteLateralCacheMulticaster dlcm = new DeleteLateralCacheMulticaster( 
cattr.getCacheName(), "ALL", cattr.getLateralCacheAddrs(), 
cattr.getLateralDeleteServlet()  );
        //dlcm.multicast();
        lateral.removeAll( cacheName, LateralCacheInfo.listenerId );
      } catch(Exception ex) {
        handleException(ex, "Failed to remove all from " + cattr.getCacheName());
      }
    }
    /**
     * Synchronously dispose the cache.
     */
    public void dispose() throws IOException {
      p( "disposing of lateral cache" );
      try {
        // Should remove cache from multicast group
      } catch(Exception ex) {
        p( "couldn't dispose" );
        handleException(ex, "Failed to dispose " +cattr.getCacheName());
        //remote = null;
      }
    }
  
    /////////////////////////////////////////////////////////
    public String getStats(){
      return "cacheName = " +cattr.getCacheName();
    }
    /**
     * Returns the cache status.
     */
    public int getStatus() {
      return this.lateral instanceof IZombie ? STATUS_ERROR : STATUS_ALIVE;
    }
    /** Returns the current cache size. */
    public int getSize() {
      return 0;
    }
  
    public int getCacheType() {
      return ICacheType.LATERAL_CACHE;
    }
    public String getCacheName() {
      return cacheName;
    }
  
  
    /**
     *  Not yet sure what to do here.
     */
    private void handleException(Exception ex, String msg) throws IOException {
  
      log.error( "Disabling lateral cache due to error " + msg);
      log.error(ex);
      lateral = new ZombieLateralCacheService();
      // may want to flush if region specifies
      // Notify the cache monitor about the error, and kick off the recovery process.
      LateralCacheMonitor.getInstance().notifyError();
  
      // could stop the net serach if it is built and try to reconnect?
      if (ex instanceof IOException) {
        throw (IOException)ex;
      }
      throw new IOException(ex.getMessage());
    }
  
    /** Replaces the current remote cache service handle with the given handle. */
    public void fixCache(ILateralCacheService lateral) {
      this.lateral = lateral;
      return;
    }
  
  
    /////////////////////////////////////////////////
    private void p (String s) {
      //log.debug("LateralCache:" + s);
      System.out.println("LateralCache:" + s);
    }
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheAttributes.java
  
  Index: LateralCacheAttributes.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.lateral;
  
  import java.io.*;
  import java.util.*;
  
  //import org.apache.stratum.jcs.auxiliary.*;
  import org.apache.stratum.jcs.auxiliary.behavior.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.access.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
  import org.apache.stratum.jcs.auxiliary.lateral.http.broadcast.*;
  import org.apache.stratum.jcs.auxiliary.lateral.http.remove.*;
  import org.apache.stratum.jcs.utils.log.*;
  import org.apache.stratum.jcs.utils.reuse.*;
  
  ///////////////////////////////////////////////////////////
  public class LateralCacheAttributes implements Serializable, ILateralCacheAttributes 
{
  
    String    transmissionTypeName = "UDP";
    int       transmissionType = UDP;
  
    ArrayList httpServers;
    // used to identify the service that this manager will be
    // operating on
    String    httpServer = "";
    String    httpReceiveServlet = "";
    String    httpDeleteServlet = "";
  
    String    udpMulticastAddr = "228.5.6.7";
    int       udpMulticastPort = 6789;
  
    //ArrayList tcpServers;
    String tcpServers;
    // used to identify the service that this manager will be
    // operating on
    String    tcpServer = "";
    int       tcpListenerPort = 1111;
  
    private String cacheName;
    private String name;
  
    /////////////////////////////////////////
    public void setHttpServer( String val ) {
      httpServer = val;
    }
    public String getHttpServer( ) {
      return httpServer;
    }
  
    /*
    /////////////////////////////////////////
    public void setTcpServers( ArrayList val ) {
      tcpServers = val;
    }
    public ArrayList getTcpServers( ) {
      return tcpServers;
    }
    */
  
    /////////////////////////////////////////
    public void setTcpServers( String val ) {
      tcpServers = val;
    }
    public String getTcpServers( ) {
      return tcpServers;
    }
  
  
    /////////////////////////////////////////
    public void setTcpServer( String val ) {
      tcpServer = val;
    }
    public String getTcpServer( ) {
      return tcpServer;
    }
  
      /////////////////////////////////////////
    public void setTcpListenerPort( int val ) {
      this.tcpListenerPort = val;
    }
    public int getTcpListenerPort( ) {
      return this.tcpListenerPort;
    }
  
    /////////////////////////////////////////
    public void setUdpMulticastAddr( String val ) {
      udpMulticastAddr = val;
    }
    public String getUdpMulticastAddr( ) {
      return udpMulticastAddr;
    }
  
    /////////////////////////////////////////
    public void setUdpMulticastPort( int val ) {
      udpMulticastPort = val;
    }
    public int getUdpMulticastPort( ) {
      return udpMulticastPort;
    }
  
    /////////////////////////////////////////
    public void setTransmissionType( int val ) {
      this.transmissionType = val;
      if ( val == UDP ) {
        transmissionTypeName = "UDP";
      } else
      if ( val == HTTP ) {
        transmissionTypeName = "HTTP";
      } else
      if ( val == TCP ) {
        transmissionTypeName = "TCP";
      }
    }
    public int getTransmissionType() {
      return this.transmissionType;
    }
  
    /////////////////////////////////////////
    public void setTransmissionTypeName( String val ) {
      this.transmissionTypeName = val;
      if ( val.equals("UDP") ) {
        transmissionType = UDP;
      } else
      if ( val.equals("HTTP") ) {
        transmissionType = HTTP;
      } else
      if ( val.equals("TCP") ) {
        transmissionType = TCP;
      }
  
  
    }
  
    /////////////////////////////////////////
    public String getTransmissionTypeName() {
      return this.transmissionTypeName;
    }
  
    ////////////////////////////////////////////////////
    public void setCacheName( String s ) {
      this.cacheName = s;
    }
    public String getCacheName( ) {
      return this.cacheName;
    }
  
    /////////////////////////////////////////////////////////////////////
    public String getName() {
      return this.name;
    }
    public void setName( String name ) {
      this.name = name;
    }
  
    /////////////////////////////////////////////////
    public IAuxiliaryCacheAttributes copy() {
      try {
        return (IAuxiliaryCacheAttributes)this.clone();
      } catch( Exception e ){}
      return (IAuxiliaryCacheAttributes)this;
    }
  
    ////////////////////////////////////////////////
    public String toString() {
      StringBuffer buf = new StringBuffer();
      buf.append( "cacheName=" + cacheName + "\n" );
      buf.append( "transmissionTypeName=" + transmissionTypeName + "\n" );
      buf.append( "transmissionType=" + transmissionType + "\n" );
      buf.append( "tcpServer=" + tcpServer + "\n" );
      buf.append( httpServer + udpMulticastAddr + String.valueOf( udpMulticastPort ) + 
tcpServer );
      return buf.toString();
    }
  
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheFactory.java
  
  Index: LateralCacheFactory.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.lateral;
  
  /**
   * Title:
   * Description:
   * Copyright:    Copyright (c) 2001
   * Company:
   * @author
   * @version 1.0
   */
  
  import java.util.*;
  
  //import org.apache.stratum.jcs.auxiliary.*;
  import org.apache.stratum.jcs.auxiliary.behavior.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.lateral.*;
  import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
  import org.apache.stratum.jcs.utils.log.*;
  
  
  /**
   *  Constructs a LateralCacheNoWaitFacade for the given configuration.
   *  Each lateral service / local relationship is managed by one manager.
   *  This manager canl have multiple caches.  The remote relationships are 
consolidated
   *  and restored via these managers.  The facade provides a front to the composite 
cache
   *  so the implmenetation is transparent.
   */
  public class LateralCacheFactory implements IAuxiliaryCacheFactory {
  
    private static Logger log = LoggerManager.getLogger( LateralCacheFactory.class );
  
    private static String name;
  
    ///////////////////////////////////////////////////////////////
    /**
     * Interface method.  Allows classforname construction, making caches pluggable.
     */
    public ICache createCache( IAuxiliaryCacheAttributes iaca ) {
      LateralCacheAttributes lac = (LateralCacheAttributes)iaca;
      ArrayList noWaits = new ArrayList();
  
      if ( lac.getTransmissionType() == lac.UDP ) {
        LateralCacheManager lcm = LateralCacheManager.getInstance( lac );
        ICache ic = lcm.getCache( lac.getCacheName() );
        if ( ic != null ) {
          noWaits.add( ic );
        }
      } else
      if ( lac.getTransmissionType() == lac.TCP ) {
  
  
        //pars up the tcp servers and set the tcpServer value and
        // get the manager and then get the cache
        //Iterator it = lac.tcpServers.iterator();
        //while( it.hasNext() ) {
  
        StringTokenizer it = new StringTokenizer( lac.tcpServers, "," );
        while( it.hasMoreElements() ) {
          //String server = (String)it.next();
          String server = (String)it.nextElement();
          //p( "tcp server = " +  server );
          lac.setTcpServer( server );
          LateralCacheManager lcm = LateralCacheManager.getInstance( lac );
          ICache ic = lcm.getCache( lac.getCacheName() );
          if ( ic != null ) {
            noWaits.add( ic );
          } else {
            //p( "noWait is null" );
          }
        }
  
      } else
      if ( lac.getTransmissionType() == lac.HTTP ) {
        Iterator it = lac.httpServers.iterator();
        while( it.hasNext() ) {
          String server = (String)it.next();
          lac.setHttpServer( server );
          LateralCacheManager lcm = LateralCacheManager.getInstance( lac );
          ICache ic = lcm.getCache( lac.getCacheName() );
          if ( ic != null ) {
            noWaits.add( ic );
          }
        }
      }
  
      LateralCacheNoWaitFacade lcnwf = new LateralCacheNoWaitFacade( 
(LateralCacheNoWait[])noWaits.toArray(new LateralCacheNoWait[0]), iaca.getCacheName() 
);
  
      return lcnwf;
    } // end createCache
  
    /////////////////////////////////////////////////////////////////////
    public String getName() {
      return this.name;
    }
    public void setName( String name ) {
      this.name = name;
    }
  
      ///////////////////////////////////////////////
    private static void p( String s ) {
      System.out.println( "LateralCacheFactory: " + s );
    }
  
  
  } // end class
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheInfo.java
  
  Index: LateralCacheInfo.java
  ===================================================================
  package  org.apache.stratum.jcs.auxiliary.lateral;
  
  import  java.rmi.dgc.VMID;
  
  /** A shared static variable holder for the lateral cache */
  public class LateralCacheInfo {
  
    // shouldn't be instantiated
    private LateralCacheInfo(){}
  
    /** Shouldn't be used till after reconneting, after setting = thread safe
     *  Used to identify a client, so we can run multiple clients off one host.
     *  Need since there is no way to identify a client other than by host in rmi.
     */
    protected static VMID vmid = new VMID();
    public static byte listenerId = (byte)vmid.hashCode();
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheManager.java
  
  Index: LateralCacheManager.java
  ===================================================================
  package  org.apache.stratum.jcs.auxiliary.lateral;
  
  import  java.io.*;
  import  java.net.*;
  import  java.util.*;
  
  //import  org.apache.stratum.jcs.auxiliary.*;
  import  org.apache.stratum.jcs.auxiliary.behavior.*;
  import  org.apache.stratum.jcs.engine.*;
  import  org.apache.stratum.jcs.engine.behavior.*;
  import  org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
  //import  org.apache.stratum.jcs.auxiliary.lateral.http.*;
  import  org.apache.stratum.jcs.auxiliary.lateral.socket.udp.*;
  import  org.apache.stratum.jcs.auxiliary.lateral.socket.tcp.*;
  import  org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
  import  org.apache.stratum.jcs.utils.log.*;
  
  
  
  /**
   *  Creates lateral caches.  Lateral caches are primarily used for
   *  removing non laterally configured caches.  Non laterally configured
   *  cache regions should still bea ble to participate in removal.  But
   *  if there is a non laterally configured cache hub, then lateral removals may
   *  be necessary.  For flat webserver production environments, without
   *  a strong machine at the app server level, distribution and search may
   *  need to occur at the lateral cache level.  This is currently not implemented
   *  in the lateral cache.
   */
  public class LateralCacheManager implements IAuxiliaryCacheManager {
  
    private static boolean debug = true; //false;
  
    //static ArrayList defaultServers;
    ICacheAttributes defaultCattr;
  
    private static Logger log;
    private static LateralCacheMonitor monitor;
  
    static final Map instances = new HashMap();
    // each manager instance has caches
    final Map caches = new HashMap();
    protected ILateralCacheAttributes lca;
    private int clients;
  
    /**
     * Handle to the lateral cache service; or a zombie handle if failed to connect.
     */
    private ILateralCacheService lateralService;
  
     /**
     * Wrapper of the lateral cache watch service;
     * or wrapper of a zombie service if failed to connect.
     */
    private LateralCacheWatchRepairable lateralWatch;
  
    /////////////////////////////////////////////////
    public static LateralCacheManager getInstance ( ILateralCacheAttributes lca ) {
      if ( debug ) {
        p( "getting instance" );
      }
      LateralCacheManager ins = (LateralCacheManager)instances.get(lca.toString());
      if (ins == null) {
        synchronized (instances) {
          ins = (LateralCacheManager)instances.get(lca.toString());
          if (ins == null) {
            ins = new LateralCacheManager( lca );
            instances.put(lca.toString(), ins);
          }
        }
      } else {
        if ( debug ) {
          ins.p( "found manager already created" );
        }
      }
      if (debug) {
        ins.log.logIt("Manager stats : " + ins.getStats() + "<br> -- in 
getInstance()");
      }
      ins.clients++;
      // Fires up the monitoring daemon.
      if (monitor == null) {
        monitor = LateralCacheMonitor.getInstance();
        // If the returned monitor is null, it means it's already started elsewhere.
        if (monitor != null) {
          Thread t = new Thread(monitor);
          t.setDaemon(true);
          t.start();
        }
      }
      return  ins;
    } // end getInstance
  
    ////////////////////////////////////////////////////////
    private LateralCacheManager( ILateralCacheAttributes lca ) {
      this.lca = lca;
      log = LoggerManager.getLogger( this );
      if ( debug ) {
        p( "creating lateral cache service lca = " + lca );
      }
      // need to create the service based on the type
      // ex
      try {
        if ( lca.getTransmissionType() == lca.UDP ) {
          // need to allow for this to be a service.
          // should wrap sender and new kind of receiver?
          if ( debug ) {
            p( "udp service" );
          }
          this.lateralService = new LateralUDPService( lca );
        } else
        if ( lca.getTransmissionType() == lca.HTTP ) {
          if ( debug ) {
            p( "http service" );
          }
          //this.lateralService = new LateralHTTPService( lca );
        } else
        if ( lca.getTransmissionType() == lca.TCP ) {
          if ( debug ) {
            p( "tcp service" );
          }
          this.lateralService = new LateralTCPService( lca );
        } else {
          log.error( "type not recognized, must zombie" );
          throw new Exception( "no known transmission type for lateral cache." );
        }
  
        if ( this.lateralService == null ) {
          log.error( "no service created?, must zombie" );
          throw new Exception( "no service created for lateral cache." );
        }
  
        lateralWatch = new LateralCacheWatchRepairable();
        lateralWatch.setCacheWatch(new ZombieLateralCacheWatch());
  
      } catch (Exception ex) {
        // Failed to connect to the lateral server.
        // Configure this LateralCacheManager instance to use the "zombie" services.
        log.error(ex.toString() + "**********************ZOMBIZING LATERAL SERVICE");
        lateralService = new ZombieLateralCacheService();
        lateralWatch = new LateralCacheWatchRepairable();
        lateralWatch.setCacheWatch(new ZombieLateralCacheWatch());
        // Notify the cache monitor about the error, and kick off the recovery process.
        LateralCacheMonitor.getInstance().notifyError();
      }
    } // end constructor
  
  
    /** Adds the lateral cache listener to the underlying cache-watch service. */
    public void addLateralCacheListener(String cacheName, ILateralCacheListener 
listener) throws IOException {
      synchronized (caches) {
        lateralWatch.addCacheListener(cacheName, listener);
      }
      return;
    }
  
    /** Called to access a precreated region or construct one with defaults.
     *  Since all aux cache access goes through the manager, this will never be called.
     */
    public ICache getCache ( String cacheName ) {
      LateralCacheNoWait c = null;
      synchronized (caches) {
        //c = (LateralCache)caches.get(cacheName);
        c = (LateralCacheNoWait)caches.get(cacheName);
        if (c == null) {
          c = new LateralCacheNoWait(new LateralCache(cacheName, lateralService));
          caches.put(cacheName, c);
        }
      }
  
      try {
        // need to set listener based on transmissionType
        if ( lca.getTransmissionType() == lca.UDP ) {
          addLateralCacheListener( cacheName, 
LateralGroupCacheUDPListener.getInstance(lca) );
        } else
        if ( lca.getTransmissionType() == lca.TCP ) {
          addLateralCacheListener( cacheName, 
LateralGroupCacheTCPListener.getInstance(lca) );
        }
      } catch( IOException ioe ) {
        log.error( ioe );
      } catch( Exception e ) {
        log.error( e );
      }
  
      if (log.logLevel >= log.DEBUG) {
        //log.debug("LateralManager stats : " + getStats());
      }
      return  c;
    }
  
  
  
    //////////////////////////////////////////////
    public int getCacheType() {
      return LATERAL_CACHE;
    }
  
    //////////////////////////////////////
    public String getStats(){
      // add something here
      return "";
    }
  
  
  
   /** Fixes up all the caches managed by this cache manager. */
    public void fixCaches (ILateralCacheService lateralService, ILateralCacheObserver 
lateralWatch) {
      p( "*******FIXING LATERAL CACHE*********" );
      synchronized (caches) {
        this.lateralService = lateralService;
        // need to implment an observer for some types of laterals( http and tcp)
        //this.lateralWatch.setCacheWatch(lateralWatch);
        for (Iterator en = caches.values().iterator(); en.hasNext();) {
          LateralCacheNoWait cache = (LateralCacheNoWait)en.next();
          cache.fixCache(this.lateralService);
        }
      }
    }
  
    /////////////////////////////////////////////////
    private static void p (String s) {
      System.out.println("LateralCacheManager:" + s);
    }
  
    ////////////////////////////////////////////////
    // need freeCache, release, getStats
    // need to find an interface acceptible for all
    // cache managers or a manager within a type
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheMonitor.java
  
  Index: LateralCacheMonitor.java
  ===================================================================
  package  org.apache.stratum.jcs.auxiliary.lateral;
  
  import org.apache.stratum.jcs.utils.log.*;
  import  java.util.*;
  
  /**
   * Used to monitor and repair any failed connection for the lateral cache service.
   * By default the monitor operates in a failure driven mode.  That is, it goes into 
a wait
   * state until there is an error.
   * Upon the notification of a connection error, the monitor changes to operate
   * in a time driven mode.  That is, it attempts to recover the connections on a 
periodic basis.
   * When all failed connections are restored, it changes back to the failure driven 
mode.
   */
  public class LateralCacheMonitor implements Runnable {
  
    protected static final boolean debug = true;//false;
    private static LateralCacheMonitor instance;
    private static long idlePeriod = 20*1000;            // minimum 20 seconds.
    //private static long idlePeriod = 3*1000; // for debugging.
    private transient Logger log = LoggerManager.getInstance().getLogger(this);
  
    // Must make sure LateralCacheMonitor is started before any lateral error can be 
detected!
    private boolean alright=true;
  
    /** Configures the idle period between repairs. */
    public static void setIdlePeriod(long idlePeriod) {
      if (idlePeriod > LateralCacheMonitor.idlePeriod) {
        LateralCacheMonitor.idlePeriod = idlePeriod;
      }
    }
    private LateralCacheMonitor() {}
  
    /**
     * Returns the singleton instance;
     */
    static LateralCacheMonitor getInstance() {
      if (instance == null) {
        synchronized(LateralCacheMonitor.class) {
          if (instance == null) {
            return instance=new LateralCacheMonitor();
          }
        }
      }
      return instance;
    }
    /**
     * Notifies the cache monitor that an error occurred,
     * and kicks off the error recovery process.
     */
    public void notifyError() {
      bad();
      synchronized(this) {
        notify();
      }
    }
    // Run forever.
    // Avoid the use of any synchronization in the process of monitoring for 
performance reason.
    // If exception is thrown owing to synchronization,
    // just skip the monitoring until the next round.
    public void run () {
      do {
        if (alright) {
          synchronized(this) {
            if (alright) {
              // Failure driven mode.
              try {
                wait(); // wake up only if there is an error.
              } catch(InterruptedException ignore) {
              }
            }
          }
        }
        // Time driven mode: sleep between each round of recovery attempt.
        try {
          if ( debug ) {
            p("cache monitor sleeping for " + idlePeriod);
          }
          Thread.currentThread().sleep(idlePeriod);
        } catch (InterruptedException ex) {
        // ignore;
        }
        // The "alright" flag must be false here.
        // Simply presume we can fix all the errors until proven otherwise.
        synchronized(this) {
          alright = true;
        }
        if ( debug ) {
          p("cache monitor running.");
        }
        // Monitor each LateralCacheManager instance one after the other.
        // Each LateralCacheManager corresponds to one lateral connection.
        for (Iterator itr = LateralCacheManager.instances.values().iterator(); 
itr.hasNext();) {
          LateralCacheManager mgr = (LateralCacheManager)itr.next();
          try {
            // If any cache is in error, it strongly suggests all caches managed by the
            // same LateralCacheManager instance are in error.  So we fix them once 
and for all.
            for (Iterator itr2 = mgr.caches.values().iterator(); itr2.hasNext();) {
              if (itr2.hasNext()) {
                LateralCacheNoWait c = (LateralCacheNoWait)itr2.next();
                if (c.getStatus() == c.STATUS_ERROR) {
                  if ( debug ) {
                    p( "found LateralCacheNoWait in error" );
                  }
                  LateralCacheRestore repairer = new LateralCacheRestore(mgr);
                  // If we can't fix them, just skip and re-try in the next round.
                  if (repairer.canFix()) {
                    repairer.fix();
                  }
                  else {
                    bad();
                  }
                  break;
                } else {
                  if ( debug ) {
                    p( "lcnw not in error" );
                  }
                }
              }
            }
          } catch (Exception ex) {
            bad();
            // Problem encountered in fixing the caches managed by a 
LateralCacheManager instance.
            // Soldier on to the next LateralCacheManager instance.
            log.error(ex);
          }
        }
      } while (true);
    }
    /** Sets the "alright" flag to false in a critial section. */
    private void bad() {
      if (alright) {
        synchronized(this) {
          alright = false;
        }
      }
    }
    private void p(String s) {
      System.out.println("LateralCacheMonitor:" + s);
    }
  }
  
  
  
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheNoWait.java
  
  Index: LateralCacheNoWait.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.lateral;
  
  import org.apache.stratum.jcs.engine.control.Cache;
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
  import org.apache.stratum.jcs.auxiliary.lateral.http.broadcast.*;
  import org.apache.stratum.jcs.auxiliary.lateral.socket.udp.*;
  //import org.apache.stratum.jcs.auxiliary.lateral.socket.tcp.*;
  
  import org.apache.stratum.jcs.utils.log.*;
  
  import java.io.*;
  import java.rmi.*;
  
  /**
   * Used to queue up update requests to the underlying cache.
   * These requests will be processed in their order of arrival
   * via the cache event queue processor.
   */
  public class LateralCacheNoWait implements ICache {
    private final LateralCache cache;
    private ICacheEventQueue q;
    private transient Logger log = LoggerManager.getInstance().getLogger(this);
  
    private String source_id = 
"org.apache.stratum.jcs.auxiliary.lateral.LateralCacheNoWait";
  
    ///////////////////////////////////////////////////
    public Serializable getSourceId() {
      return this.source_id;
    }
  
    /**
     * Constructs with the given lateral cache,
     * and fires up an event queue for aysnchronous processing.
     */
    public LateralCacheNoWait(LateralCache cache) {
        this.cache = cache;
        this.q = new CacheEventQueue(new CacheAdaptor(cache), 
LateralCacheInfo.listenerId, cache.getCacheName());
  
        // need each no wait to handle each of its real updates and removes, since 
there may
        // be more than one per cache?  alternativve is to have the cache
        // perform updates using a different method that spcifies the listener
        //this.q = new CacheEventQueue(new CacheAdaptor(this), 
LateralCacheInfo.listenerId, cache.getCacheName());
        if (cache.getStatus() == cache.STATUS_ERROR)
          q.destroy();
    }
  
    /** Adds a put request to the lateral cache. */
    public void put(Serializable key, Serializable value ) throws IOException {
      put( key, value, null );
    }
    public void put(Serializable key, Serializable value,  Attributes attr ) throws 
IOException {
      try {
        CacheElement ce = new CacheElement( cache.getCacheName(), key, value );
        ce.setAttributes( attr );
        update( ce );
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
    }
    public void update( ICacheElement ce ) throws IOException {
      try {
        q.addPutEvent( ce );
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
    }
  
    /** Synchronously reads from the lateral cache. */
    public Serializable get(Serializable key) {
      return get( key, true );
    }
    public Serializable get(Serializable key, boolean container ) {
      try {
        return cache.get(key);
      } catch(UnmarshalException ue) {
        p("Retrying the get owing to UnmarshalException...");
        try {
          return cache.get(key);
        } catch(IOException ex) {
          p("Failed in retrying the get for the second time.");
          q.destroy();
        }
      } catch(IOException ex) {
        q.destroy();
      }
      return null;
    }
    /** Adds a remove request to the lateral cache. */
    public boolean remove(Serializable key) {
      try {
        q.addRemoveEvent(key);
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
      return false;
    }
    /** Adds a removeAll request to the lateral cache. */
    public void removeAll() {
      try {
        q.addRemoveAllEvent();
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
    }
    /** Adds a dispose request to the lateral cache. */
    public void dispose() {
      try {
        q.addDisposeEvent();
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
    }
    /** No lateral invokation. */
    public String getStats() {
      return cache.getStats();
    }
    /** No lateral invokation. */
    public int getSize() {
      return cache.getSize();
    }
    /** No lateral invokation. */
    public int getCacheType() {
      return cache.getCacheType();
    }
    /**
     * Returns the asyn cache status.
     * An error status indicates either the lateral connection is not available,
     * or the asyn queue has been unexpectedly destroyed.
     * No lateral invokation.
     */
    public int getStatus() {
      return q.isAlive() ? cache.getStatus() : cache.STATUS_ERROR;
    }
    public String getCacheName() {
      return cache.getCacheName();
    }
  
    /**
     * Replaces the lateral cache service handle with the given handle and
     * reset the event queue by starting up a new instance.
     */
    public void fixCache(ILateralCacheService lateral) {
      cache.fixCache(lateral);
      resetEventQ();
      return;
    }
    /** Resets the event q by first destroying the existing one and starting up new 
one. */
    public void resetEventQ() {
      if (q.isAlive()) {
        q.destroy();
      }
      this.q = new CacheEventQueue(new CacheAdaptor(cache), 
LateralCacheInfo.listenerId, cache.getCacheName());
    }
    public String toString() {
      return "LateralCacheNoWait: " + cache.toString();
    }
    private void p(String s) {
      System.out.println("LateralCacheNoWait:" + s);
    }
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java
  
  Index: LateralCacheNoWaitFacade.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.lateral;
  
  import org.apache.stratum.jcs.engine.control.Cache;
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.lateral.http.broadcast.*;
  import org.apache.stratum.jcs.auxiliary.lateral.socket.udp.*;
  //import org.apache.stratum.jcs.auxiliary.lateral.socket.tcp.*;
  
  import org.apache.stratum.jcs.utils.log.*;
  
  import java.io.*;
  import java.rmi.*;
  
  /**
   *  Used to provide access to multiple services under nowait protection.
   *  Composite factory should construct LateralCacheNoWaitFacade to give
   *  to the composite cache out of caches it constructs from the varies
   *  manager to lateral services.  Perhaps the lateralcache factory
   *  should be able to do this.
   */
  public class LateralCacheNoWaitFacade implements ICache {
  
    private static final boolean debug = false;//true;
  
    public LateralCacheNoWait[] noWaits;
    private transient Logger log = LoggerManager.getInstance().getLogger(this);
    private String source_id = 
"org.apache.stratum.jcs.auxiliary.lateral.LateralCacheNoWaitFacade";
  
    private String cacheName;
  
    ///////////////////////////////////////////////////
    public Serializable getSourceId() {
      return this.source_id;
    }
  
    /**
     * Constructs with the given lateral cache,
     * and fires events to any listeners.
     */
    public LateralCacheNoWaitFacade(LateralCacheNoWait[] noWaits, String cacheName) {
      this.noWaits = noWaits;
      this.cacheName = cacheName;
    }
  
    /** Adds a put request to the lateral cache. */
    public void put(Serializable key, Serializable value ) throws IOException {
      put( key, value, null );
    }
    public void put(Serializable key, Serializable value,  Attributes attr ) throws 
IOException {
      try {
        CacheElement ce = new CacheElement( cacheName, key, value );
        ce.setAttributes( attr );
        update( ce );
      } catch(Exception ex) {
        log.error(ex);
      }
    }
    public void update( ICacheElement ce ) throws IOException {
      if ( debug ) {
        p( "updating through lateral cache facade, noWaits.length = " + noWaits.length 
);
      }
      try {
        for( int i=0; i < noWaits.length; i++ ) {
          noWaits[i].update( ce );
        }
      } catch(Exception ex) {
        log.error(ex);
      }
    }
  
    /** Synchronously reads from the lateral cache. */
    public Serializable get(Serializable key) {
      return get( key, true );
    }
    public Serializable get(Serializable key, boolean container ) {
      for( int i=0; i < noWaits.length; i++ ) {
        try {
          Object obj = noWaits[i].get( key, container );
          if ( obj != null ) {
            return (Serializable)obj;
          }
        } catch(Exception ex) {
          p("Failed to get.");
        }
        return null;
      }
      return null;
    }
    /** Adds a remove request to the lateral cache. */
    public boolean remove(Serializable key) {
      try {
        for( int i=0; i < noWaits.length; i++ ) {
          noWaits[i].remove( key );
        }
      } catch(Exception ex) {
        log.error(ex);
      }
      return false;
    }
    /** Adds a removeAll request to the lateral cache. */
    public void removeAll() {
      try {
        for( int i=0; i < noWaits.length; i++ ) {
          noWaits[i].removeAll();
        }
      } catch(Exception ex) {
        log.error(ex);
      }
    }
    /** Adds a dispose request to the lateral cache. */
    public void dispose() {
      try {
        for( int i=0; i < noWaits.length; i++ ) {
          noWaits[i].dispose();
        }
      } catch(Exception ex) {
        log.error(ex);
      }
    }
    /** No lateral invokation. */
    public String getStats() {
      return "";//cache.getStats();
    }
    /** No lateral invokation. */
    public int getSize() {
      return 0; //cache.getSize();
    }
  
    /////////////////////////////////////////////
    public int getCacheType() {
      return ICacheType.LATERAL_CACHE;
    }
  
    public String getCacheName() {
      return ""; //cache.getCacheName();
    }
  
    // need to do something with this
    public int getStatus() {
      return 0;//q.isAlive() ? cache.getStatus() : cache.STATUS_ERROR;
    }
  
    ////////////////////////////////////////////////////////
    public String toString() {
      return "LateralCacheNoWaitFacade: " + cacheName;
    }
    private void p(String s) {
      System.out.println("LateralCacheNoWaitFacade:" + s);
    }
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheRestore.java
  
  Index: LateralCacheRestore.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.lateral;
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.lateral.*;
  import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
  import org.apache.stratum.jcs.auxiliary.lateral.socket.udp.*;
  import org.apache.stratum.jcs.auxiliary.lateral.socket.tcp.*;
  
  import org.apache.stratum.jcs.utils.log.*;
  
  import java.rmi.*;
  import java.rmi.registry.*;
  
  /**
   * Used to repair the lateral caches managed by the associated instance of 
LateralCacheManager.
   */
  public class LateralCacheRestore implements ICacheRestore {
  
    private Logger log = LoggerManager.getLogger(this);
  
    private boolean debug = true;
    private final LateralCacheManager lcm;
    private boolean canFix = true;
  
    private Object lateralObj;
  
    /** Constructs with the given instance of LateralCacheManager. */
    public LateralCacheRestore(LateralCacheManager lcm) {
      this.lcm = lcm;
    }
  
    /**
     * Returns true iff the connection to the lateral host for the corresponding cache 
manager
     * can be successfully re-established.
     */
    public boolean canFix() {
      if (!canFix) {
        return canFix;
      }
  
      try {
  
        // restore based on type.  Only the tcp scoket type really needs restoring.
        if( lcm.lca.getTransmissionType() == lcm.lca.UDP ) {
          lateralObj = new LateralUDPService( lcm.lca );
        } else
        if( lcm.lca.getTransmissionType() == lcm.lca.TCP ) {
          lateralObj = new LateralTCPService( lcm.lca );
        } else
        if( lcm.lca.getTransmissionType() == lcm.lca.HTTP ) {
  
        }
      } catch(Exception ex) {
        log.error(ex.getMessage());
        canFix = false;
      }
  
      return canFix;
    }
  
    /** Fixes up all the caches managed by the associated cache manager. */
    public void fix() {
      if (!canFix) {
        return;
      }
      lcm.fixCaches((ILateralCacheService)lateralObj, 
(ILateralCacheObserver)lateralObj);
      String msg = "Lateral connection resumed.";
      log.info(msg);
      p(msg);
    }
  
    ////////////////////////////////////////////////////
    private void p(String s) {
      System.out.println("LateralCacheRestore:"+s);
    }
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheWatchRepairable.java
  
  Index: LateralCacheWatchRepairable.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.lateral;
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
  
  /** Same as CacheWatcherWrapper but implements the IRemoteCacheWatch interface. */
  public class LateralCacheWatchRepairable extends CacheWatchRepairable implements 
ILateralCacheObserver {
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralElementDescriptor.java
  
  Index: LateralElementDescriptor.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.lateral;
  
  import org.apache.stratum.jcs.auxiliary.lateral.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  
  import java.io.*;
  import java.util.*;
  import java.sql.*;
  
  ////////////////////////////////////////////////////////////////////
  public class LateralElementDescriptor implements Serializable {
  
    // command types
    public static final int UPDATE    = 1;
    public static final int REMOVE    = 2;
    public static final int REMOVEALL = 3;
    public static final int DISPOSE   = 4;
  
    public ICacheElement ce;
    public byte requesterId;
  
    public int command = UPDATE;
  
  
    /////////////////////////////////////
    // for update command
    public LateralElementDescriptor( ) {
    }
  
    /////////////////////////////////////
    public LateralElementDescriptor( ICacheElement ce ) {
      this.ce = ce;
    }
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/ZombieLateralCacheService.java
  
  Index: ZombieLateralCacheService.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.lateral;
  
  import java.io.*;
  
  import org.apache.stratum.jcs.engine.behavior.ICacheService;
  import org.apache.stratum.jcs.engine.behavior.ICacheElement;
  import org.apache.stratum.jcs.engine.behavior.ICache;
  import org.apache.stratum.jcs.utils.reuse.*;
  import org.apache.stratum.jcs.engine.ZombieCacheService;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.behavior.*;
  import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
  
  public class ZombieLateralCacheService extends ZombieCacheService implements 
ILateralCacheService {
  
    public void update(ICacheElement item, byte listenerId) {}
    public void remove(String cacheName, Serializable key, byte listenerId) {}
    public void removeAll(String cacheName, byte listenerId) {}
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/ZombieLateralCacheWatch.java
  
  Index: ZombieLateralCacheWatch.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.lateral;
  
  import org.apache.stratum.jcs.engine.ZombieCacheWatch;
  import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
  
  public class ZombieLateralCacheWatch extends ZombieCacheWatch implements 
ILateralCacheObserver {
  }
  
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to