asmuts      02/01/14 22:33:14

  Added:       src/java/org/apache/stratum/jcs/auxiliary/remote
                        RemoteCache.java RemoteCacheAttributes.java
                        RemoteCacheClientTest.java RemoteCacheFactory.java
                        RemoteCacheFailoverRunner.java RemoteCacheInfo.java
                        RemoteCacheListener.java RemoteCacheManager.java
                        RemoteCacheMonitor.java RemoteCacheNoWait.java
                        RemoteCacheNoWaitFacade.java
                        RemoteCacheRestore.java
                        RemoteCacheWatchRepairable.java RemoteUtils.java
                        ZombieRemoteCacheService.java
                        ZombieRemoteCacheWatch.java
  Log:
  the core rmi remote cache
  failover needs to be more efficient
  
  Revision  Changes    Path
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCache.java
  
  Index: RemoteCache.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote;
  
  import org.apache.stratum.jcs.engine.control.Cache;
  
  import java.io.*;
  import java.rmi.*;
  import java.util.*;
  import java.sql.*;
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.access.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.auxiliary.remote.*;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  
  import org.apache.stratum.jcs.utils.log.*;
  import org.apache.stratum.jcs.utils.reuse.*;
  
  
  /**
   * Client proxy for an RMI remote cache.
   *
   */
  public class RemoteCache implements ICache {
  
    private static boolean dbug = false;//true;
    Logger log;
    private static int numCreated = 0;
    private static boolean debug =  false;
    private static boolean debugR =  false;
    private static boolean debugPut =  false;
  
    final String cacheName;
    private IRemoteCacheService remote;
    private IRemoteCacheAttributes irca;
  
    Attributes attr = null;
  
    private HashMap keyHash;  // not synchronized to maximize concurrency.
  
    private String source_id = "org.apache.stratum.jcs.auxiliary.remote.RemoteCache";
  
    ///////////////////////////////////////////////////
    public Serializable getSourceId() {
      return this.source_id;
    }
  
  
    ////////////////////////////////////////
    public String toString() {
      return "RemoteCache: " + cacheName;
    }
  
    ////////////////////////////////////////////////////////
    // was public but need to access from server
    public RemoteCache(IRemoteCacheAttributes cattr, IRemoteCacheService remote) {
      this.irca = cattr;
      this.cacheName= cattr.getCacheName();
      this.remote = remote;
      log = LoggerManager.getLogger( this );
  
      if (dbug) {
        p("Construct> cacheName=" + cattr.getCacheName());
        p( "irca = " + irca.toString() );
      }
  
      /*
      // TODO
      // should be done by the remote cache, not the job of the hub manager
      // Set up the idle period for the RemoteCacheMonitor.
      long monPeriod = 0;
      try {
        monPeriod = Long.parseLong(props.getProperty("remote.monitor.idle.period", 
"0"));
      } catch(NumberFormatException ex) {
        log.warn(ex.getMessage());
      }
      RemoteCacheMonitor.setIdlePeriod(monPeriod);
      */
  
    }
  
      //////////////////////////////////
    public void setAttributes ( Attributes attr ) {
      this.attr = attr;
    }
    //////////////////////////////////
    public Attributes getAttributes () {
      return  this.attr;
    }
  
  
    /**
     * Synchronously put to the remote cache;  if failed, replace the remote
     * handle with a zombie.
     */
    public void put(Serializable key, Serializable value ) throws IOException {
      put( key, value, this.attr.copy() );
    }
    public void put(Serializable key, Serializable value, Attributes attr ) throws 
IOException {
      try {
        CacheElement ce = new CacheElement(cacheName,sanitized(key), sanitized(value));
        ce.setAttributes( attr );
        update( ce );
      } catch(Exception ex) {
        handleException(ex, "Failed to put " + key + " to " + cacheName);
        //throw ex;
      }
    }
    public void update( ICacheElement ce ) throws IOException {
      if ( !this.irca.getGetOnly() ) {
        try {
          remote.update( ce, RemoteCacheInfo.listenerId );
        } catch ( NullPointerException npe ) {
          log.error( npe, "npe for ce = " + ce + "ce.attr = " + ce.getAttributes() );
          return;
        } catch(Exception ex) {
          handleException(ex, "Failed to put " + ce.getKey() + " to " + 
ce.getCacheName());
          //throw ex;
        }
      } else {
        //p( "get only mode, irca = " + irca.toString() );
      }
    }
  
  
    /**
     * Synchronously get from the remote cache;  if failed, replace the remote
     * handle with a zombie.
     */
    public Serializable get(Serializable key) throws IOException {
      try {
        return remote.get(cacheName, sanitized(key));
      } catch ( ObjectNotFoundException one ) {
        log.debug( "didn't find element " + key + " in remote" );
        return null;
      } catch(Exception ex) {
        handleException(ex, "Failed to get " + key + " from " + cacheName);
        //throw ex;
        return null; // never executes; just keep the compiler happy.
      }
    }
  
    /**
     * Wraps a non JDK object into a MarshalledObject, so that we can avoid 
unmarshalling
     * the real object on the remote side.  This technique offers the benefit of
     * surviving incompatible class versions without the need to restart the remote 
cache server.
     */
    private Serializable sanitized(Serializable s) throws IOException {
      // In the unlikely case when the passed in object is a MarshalledObjct, we again 
wrap
      // it into a new MarsahlledObject for "escape" purposes during the get operation.
      //return s.getClass().getName().startsWith("java.") && !(s instanceof 
MarshalledObject) ? s : new MarshalledObject(s);
  
      // avoid this step for now, [problem with group id wrapper]
      return s;
    }
  
    /**
     * Synchronously get from the remote cache;  if failed, replace the remote
     * handle with a zombie.
     */
    public Serializable get(Serializable key, boolean container) throws IOException {
      try {
        return remote.get(cacheName, sanitized(key), container);
      } catch ( ObjectNotFoundException one ) {
        log.debug( "didn't find element " + key + " in remote" );
        return null;
      } catch(Exception ex) {
        handleException(ex, "Failed to get " + key + " from " + cacheName);
        return null; // never executes; just keep the compiler happy.
        //throw ex;
      }
    }
  
    /**
     * Synchronously remove from the remote cache;  if failed, replace the remote
     * handle with a zombie.
     */
    public boolean remove(Serializable key) throws IOException {
      if ( !this.irca.getGetOnly() ) {
        if (dbug) {
          p("remove> key="+key);
        }
        try {
          remote.remove(cacheName, sanitized(key), RemoteCacheInfo.listenerId);
        } catch(Exception ex) {
          handleException(ex, "Failed to remove " + key + " from " + cacheName);
          //throw ex;
        }
      }
      return false;
    }
    /**
     * Synchronously removeAll from the remote cache;  if failed, replace the remote
     * handle with a zombie.
     */
    public void removeAll() throws IOException {
      if ( !this.irca.getGetOnly() ) {
        try {
          remote.removeAll(cacheName, RemoteCacheInfo.listenerId);
        } catch(Exception ex) {
          handleException(ex, "Failed to remove all from " + cacheName);
          //throw ex;
        }
      }
    }
    /**
     * Synchronously dispose the remote cache;  if failed, replace the remote
     * handle with a zombie.
     */
    public void dispose() throws IOException {
  //    remote.freeCache(cacheName);
      p( "disposing of remote cache" );
      try {
        remote.dispose(cacheName);
      } catch(Exception ex) {
        p( "couldn't dispose" );
        handleException(ex, "Failed to dispose " + cacheName);
        //remote = null;
      }
    }
    public String getStats(){
      return "cacheName = " + cacheName;
    }
    /**
     * Returns the cache status.
     * An error status indicates the remote connection is not available.
     */
    public int getStatus() {
      return remote instanceof IZombie ? STATUS_ERROR : STATUS_ALIVE;
    }
    /** Returns the current cache size. */
    public int getSize() {
      return 0;
    }
    public int getCacheType() {
      return REMOTE_CACHE;
    }
    public String getCacheName() {
      return cacheName;
    }
    /** Replaces the current remote cache service handle with the given handle. */
    public void fixCache(IRemoteCacheService remote) {
      this.remote = remote;
      return;
    }
    /**
     * Handles exception by disabling the remote cache service before re-throwing the
     * exception in the form of an IOException.
     */
    private void handleException(Exception ex, String msg) throws IOException {
        log.error( "Disabling remote cache due to error " + msg);
        //log.error(ex);
        log.error(ex.toString());
        remote = new ZombieRemoteCacheService();
        // may want to flush if region specifies
        // Notify the cache monitor about the error, and kick off the recovery process.
        RemoteCacheMonitor.getInstance().notifyError();
  
  
        // initiate failover if local
        RemoteCacheNoWaitFacade rcnwf = 
(RemoteCacheNoWaitFacade)RemoteCacheFactory.facades.get( irca.getCacheName() );
        p( "Initiating failover, rcnf = " + rcnwf );
        if ( rcnwf != null && rcnwf.rca.getRemoteType() == rcnwf.rca.LOCAL ) {
          p( "found facade calling failover" );
          // may need to remove the noWait index here. It will be 0 if it is local
          // since there is only 1 possible listener.
          rcnwf.failover( 0 );
        }
  
  
        if (ex instanceof IOException) {
          throw (IOException)ex;
        }
        throw new IOException(ex.getMessage());
    }
    private void p(String s) {
      System.out.println("RemoteCache:"+s+" >"+Thread.currentThread().getName());
      log.debug("RemoteCache:"+s+" >"+Thread.currentThread().getName());
    }
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheAttributes.java
  
  Index: RemoteCacheAttributes.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote;
  
  /**
   * Title:
   * Description:
   * Copyright:    Copyright (c) 2001
   * Company:
   * @author Aaron Smuts
   * @version 1.0
   */
  import java.io.*;
  import java.util.*;
  
  import org.apache.stratum.jcs.auxiliary.behavior.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.access.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheAttributes;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  import org.apache.stratum.jcs.auxiliary.remote.*;
  import org.apache.stratum.jcs.utils.log.*;
  import org.apache.stratum.jcs.utils.reuse.*;
  
  ////////////////////////////////////////////////////////////////
  public class RemoteCacheAttributes implements IRemoteCacheAttributes {
  
    private String cacheName;
    private String name;
  
    private String remoteServiceName = IRemoteCacheConstants.REMOTE_CACHE_SERVICE_VAL;
    private String remoteHost;
    private int remotePort;
  
    /*
     * failover servers will be used by local caches one at a time.
     * Listeners will be registered with all cluster servers.
     * If we add a get from cluster attribute we will have the ability
     * to chain clusters and have them get from each other.
     */
    private String failoverServers = "";
    private String clusterServers = "";
    private boolean getFromCluster = true;
  
    private int localPort = 0;
  
    private int remoteType = LOCAL;
    private int failoverIndex = 0;
    private String[] failovers;
  
    private boolean removeUponRemotePut = true;
    private boolean getOnly = false;
  
    ////////////////////////////////////////////////////
    public RemoteCacheAttributes() {
    }
  
    //////////////////////////////////////////////////////
    public String getRemoteTypeName() {
      if ( remoteType == LOCAL ) {
        return "LOCAL";
      } else if ( remoteType == CLUSTER ) {
        return "CLUSTER";
      }
      return "LOCAL";
    }
    public void setRemoteTypeName( String s ) {
      if ( s.equals("LOCAL") ) {
        remoteType = LOCAL;
      } else if ( s.equals("CLUSTER") ) {
        remoteType = CLUSTER;
      }
    }
  
    ///////////////////////////////////////////
    public int getFailoverIndex() {
      return failoverIndex;
    }
    public void setFailoverIndex( int p ) {
      this.failoverIndex = p;
    }
  
    ////////////////////////////////////////////
    public String[] getFailovers() {
      return this.failovers;
    }
    public void setFailovers( String[] f ) {
      this.failovers = f;
    }
  
  
    /////////////////////////////////////////////////////////
    public int getRemoteType() {
      return remoteType;
    }
    public void setRemoteType( int p ) {
      this.remoteType = p;
    }
  
    ////////////////////////////////////////////////////
    public void setCacheName( String s ) {
      this.cacheName = s;
    }
    public String getCacheName( ) {
      return this.cacheName;
    }
  
    /////////////////////////////////////////////////////////////////////
    public String getName() {
      return this.name;
    }
    public void setName( String name ) {
      this.name = name;
    }
  
    /////////////////////////////////////////////////
    public IAuxiliaryCacheAttributes copy() {
      try {
        return (IAuxiliaryCacheAttributes)this.clone();
      } catch( Exception e ){}
      return (IAuxiliaryCacheAttributes)this;
    }
  
    ////////////////////////////////////////////////////
    public String getRemoteServiceName() {
      return this.remoteServiceName;
    }
    public void setRemoteServiceName( String s) {
      this.remoteServiceName = s;
    }
  
    ////////////////////////////////////////////////////
    public String getRemoteHost() {
      return this.remoteHost;
    }
    public void setRemoteHost( String s ) {
      this.remoteHost = s;
    }
  
    ////////////////////////////////////////////////////
    public int getRemotePort() {
      return this.remotePort;
    }
    public void setRemotePort( int p ) {
      this.remotePort = p;
    }
  
    ////////////////////////////////////////////////////
    public String getClusterServers() {
      return this.clusterServers;
    }
    public void setClusterServers( String s ) {
      this.clusterServers = s;
    }
  
   ////////////////////////////////////////////////////
    public String getFailoverServers() {
      return this.failoverServers;
    }
    public void setFailoverServers( String s ) {
      this.failoverServers = s;
    }
  
    ////////////////////////////////////////////////////
    public int getLocalPort() {
      return this.localPort;
    }
    public void setLocalPort( int p ) {
      this.localPort = p;
    }
  
    ////////////////////////////////////////////////////////
    public boolean getRemoveUponRemotePut() {
      return this.removeUponRemotePut;
    }
    public void setRemoveUponRemotePut( boolean r ) {
      this.removeUponRemotePut = r;
    }
  
    ////////////////////////////////////////////////////////
    public boolean getGetOnly() {
      return this.getOnly;
    }
    public void setGetOnly( boolean r ) {
      this.getOnly = r;
    }
  
  
    ////////////////////////////////////////////////////
    public String toString() {
      StringBuffer buf = new StringBuffer();
      buf.append( "\nremotePort = " + this.remoteHost );
      buf.append( "\nremotePort = " + this.remotePort );
      buf.append( "\ncacheName = " + this.cacheName );
      buf.append( "\nremoveUponRemotePut = " + this.removeUponRemotePut );
      buf.append( "\ngetOnly = " + getOnly );
      return buf.toString();
    }
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheClientTest.java
  
  Index: RemoteCacheClientTest.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote;
  
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.IRemoteCacheConstants;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  
  import org.apache.stratum.jcs.utils.reuse.*;
  
  import java.io.*;
  import java.net.*;
  import java.rmi.*;
  import java.rmi.registry.*;
  import java.rmi.server.*;
  import java.util.*;
  
  public class RemoteCacheClientTest implements IRemoteCacheListener, 
IRemoteCacheConstants {
  
    ICacheObserver watch;
    ICacheService cache;
    boolean debug=true;
    /** The registry host name. */
    final String host;
    /** The registry port number. */
    final int port;
    final int count;
  
    protected static byte listenerId = 0;
  
    ////////////////////////////////////////////////////
    public int getRemoteType( ) throws IOException {
      return 0;
    }
  
    public RemoteCacheClientTest(int count)
        throws MalformedURLException, NotBoundException, IOException
    {
      this(count, true, true, false);
    }
    public RemoteCacheClientTest(int count, boolean write, boolean read, boolean 
delete)
        throws MalformedURLException, NotBoundException, IOException
    {
      this("", Registry.REGISTRY_PORT, count, write, read, delete);
    }
    public RemoteCacheClientTest(String host, int port, int count,
        boolean write, boolean read, boolean delete)
        throws MalformedURLException, NotBoundException, IOException
    {
      this.count = count;
      this.host = host;
      this.port = port;
      // record export exception
      Exception ee = null;
  
      try {
          // Export this remote object to make it available to receive incoming calls,
          // using an anonymous port.
          UnicastRemoteObject.exportObject(this);
          ee = null;
      } catch (ExportException e) {
          // use already exported object; remember exception
          ee = e;
      }
      String service = System.getProperty(REMOTE_CACHE_SERVICE_NAME);
  
      if (service == null) {
        service = REMOTE_CACHE_SERVICE_VAL;
      }
      String registry = "//" + host  + ":" + port + "/" + service;
      if (debug) {
        p("looking up server " + registry);
      }
      Object obj = Naming.lookup(registry);
  
      if (debug) {
        p("server found");
      }
      cache = (ICacheService)obj;
      watch = (ICacheObserver)obj;
  
      if (debug) {
        p("subscribing to the server");
      }
      watch.addCacheListener("testCache", this);
      ICacheElement cb = new CacheElement("testCache", "testKey", "testVal");
  
      for (int i=0; i < count; i++) {
        cb = new CacheElement("testCache", ""+i, ""+i);
  
        if (delete) {
          if (debug) {
            p("deleting a cache item from the server " + i);
          }
          cache.remove(cb.getCacheName(), cb.getKey());
        }
        if (write) {
          if (debug) {
            p("putting a cache bean to the server " + i);
          }
          try {
            cache.update( cb );
          } catch ( ObjectExistsException oee ) {
            p( oee.toString() );
          }
        }
        if (read) {
          try {
            Object val = cache.get(cb.getCacheName(), cb.getKey());
            p("get " + cb.getKey() + " returns " + val);
          } catch ( ObjectNotFoundException onfe ) {}
        }
      }
    }
    public void handlePut(ICacheElement cb) throws IOException {
      p("handlePut> cb="+cb);
    }
    public void handleRemove(String cacheName, Serializable key) throws IOException {
      p("handleRemove> cacheName="+cacheName+", key="+key);
    }
    public void handleRemoveAll(String cacheName) throws IOException {
      p("handleRemove> cacheName="+cacheName);
    }
    public void handleDispose(String cacheName) throws IOException {
      p("handleDispose> cacheName="+cacheName);
    }
  /*
    public void handleRelease() throws IOException {
      p("handleRelease>");
    }
  */
    private static void p(String s) {
      System.out.println("RemoteCacheClientTest:"+s);
    }
    public static void main(String[] args) throws Exception {
      int count=0;
      boolean read=false;
      boolean write=false;
      boolean delete=false;
  
      for (int i=0; i < args.length; i++) {
        if (args[i].startsWith("-")) {
          if (!read) {
            read = args[i].indexOf("r") != -1;
          }
          if (!write) {
            write = args[i].indexOf("w") != -1;
          }
          if (!delete) {
            delete = args[i].indexOf("d") != -1;
          }
        }
        else {
          count = Integer.parseInt(args[i]);
        }
      }
      RemoteCacheClientTest client = new RemoteCacheClientTest(count, write, read, 
delete);
    }
  
    public void setListenerId( byte id ) throws IOException {
      this.listenerId = id;
      p( "listenerId = " +  id );
    }
    public byte getListenerId( ) throws IOException {
      return this.listenerId;
    }
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheFactory.java
  
  Index: RemoteCacheFactory.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote;
  
  /**
   * Title:
   * Description:
   * Copyright:    Copyright (c) 2001
   * Company:
   * @author
   * @version 1.0
   */
  
   import java.util.*;
  
  //import org.apache.stratum.jcs.auxiliary.*;
  import org.apache.stratum.jcs.auxiliary.behavior.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.remote.*;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  import org.apache.stratum.jcs.utils.log.*;
  
  
  public class RemoteCacheFactory implements IAuxiliaryCacheFactory {
  
    private static Logger log = LoggerManager.getLogger( RemoteCacheFactory.class );
  
    private static String name;
  
    // store reference of facades to initiate failover
    public static final HashMap facades = new HashMap();
  
  
    ///////////////////////////////////
    public RemoteCacheFactory() {
    }
  
  
     ///////////////////////////////////////////////////////////////
    /**
     * Interface method.  Allows classforname construction, making caches pluggable.
     * Should be able to make this work for clusters and local caches
     */
    public ICache createCache( IAuxiliaryCacheAttributes iaca ) {
  
      RemoteCacheAttributes rca = (RemoteCacheAttributes)iaca;
  
      ArrayList noWaits = new ArrayList();
  
      // if LOCAL
      if ( rca.getRemoteType() == rca.LOCAL ) {
  
        // a list toi be turned into an array of failover server information
        ArrayList failovers = new ArrayList();
  
        // not necessary if a failover list is defined
        // REGISTER PRIMARY LISTENER
        // if it is a primary
        if ( rca.getRemoteHost() != null ) {
  
          failovers.add( rca.getRemoteHost() + ":" + rca.getRemotePort() );
  
          RemoteCacheManager rcm = RemoteCacheManager.getInstance( rca );
          ICache ic = rcm.getCache( rca );
          if ( ic != null ) {
            noWaits.add( ic );
          } else {
            //p( "noWait is null" );
          }
        }
  
        // GET HANDLE BUT DONT REGISTER A LISTENER FOR FAILOVERS
        String failoverList = rca.getFailoverServers();
        if ( failoverList != null ) {
          StringTokenizer fit = new StringTokenizer( failoverList, "," );
          while( fit.hasMoreElements() ) {
  
            String server = (String)fit.nextElement();
            failovers.add( server );
  
            rca.setRemoteHost( server.substring( 0,server.indexOf(":") ) );
            rca.setRemotePort( Integer.parseInt(server.substring( server.indexOf(":") 
+1)) );
            RemoteCacheManager rcm = RemoteCacheManager.getInstance( rca );
            // add a listener if there are none, need to tell rca what number it is at
            if ( noWaits.size() <= 0 ) {
              ICache ic = rcm.getCache( rca.getCacheName() );
              if ( ic != null ) {
                noWaits.add( ic );
              } else {
                //p( "noWait is null" );
              }
            }
  
          } // end while
        } // end if failoverList != null
  
        rca.setFailovers( (String[])failovers.toArray(new String[0]) );
  
      // if CLUSTER
      }  else
      if ( rca.getRemoteType() == rca.CLUSTER ) {
  
        // REGISTER LISTENERS FOR EACH SYSTEM CLUSTERED CACHEs
        StringTokenizer it = new StringTokenizer( rca.getClusterServers(), "," );
        while( it.hasMoreElements() ) {
          //String server = (String)it.next();
          String server = (String)it.nextElement();
          //p( "tcp server = " +  server );
          rca.setRemoteHost( server.substring( 0,server.indexOf(":") ) );
          rca.setRemotePort( Integer.parseInt(server.substring( server.indexOf(":") 
+1)) );
          RemoteCacheManager rcm = RemoteCacheManager.getInstance( rca );
          rca.setRemoteType( rca.CLUSTER );
          ICache ic = rcm.getCache( rca );
          if ( ic != null ) {
            noWaits.add( ic );
          } else {
            //p( "noWait is null" );
          }
        }
  
      } // end if CLUSTER
  
      //RemoteCacheNoWaitFacade rcnwf = new RemoteCacheNoWaitFacade( 
(RemoteCacheNoWait[])noWaits.toArray(new RemoteCacheNoWait[0]), iaca.getCacheName() );
      RemoteCacheNoWaitFacade rcnwf = new RemoteCacheNoWaitFacade( 
(RemoteCacheNoWait[])noWaits.toArray(new RemoteCacheNoWait[0]), rca );
  
      facades.put( rca.getCacheName(), rcnwf );
  
      return rcnwf;
    } // end createCache
  
    /////////////////////////////////////////////////////////////////////
    public String getName() {
      return this.name;
    }
    public void setName( String name ) {
      this.name = name;
    }
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheFailoverRunner.java
  
  Index: RemoteCacheFailoverRunner.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote;
  
  import  java.util.*;
  
  import org.apache.stratum.jcs.auxiliary.behavior.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.remote.*;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  import org.apache.stratum.jcs.utils.log.*;
  
  //////////////////////////////////////////////////////////////
  public class RemoteCacheFailoverRunner implements Runnable {
  
    private RemoteCacheNoWaitFacade facade;
  
    private static long idlePeriod = 20*1000;
    private boolean alright=true;
  
    private transient Logger log = 
LoggerManager.getLogger(RemoteCacheFailoverRunner.class);
  
    //////////////////////////////////////////////
    public RemoteCacheFailoverRunner( RemoteCacheNoWaitFacade facade ) {
      this.facade = facade;
    }
  
    //////////////////////////////////////////////
    /**
     * Notifies the cache monitor that an error occurred,
     * and kicks off the error recovery process.
     */
    public void notifyError() {
      bad();
      synchronized(this) {
        notify();
      }
    }
  
    ////////////////////////////////////////////////
    public void run() {
      do {
  
        // will only be run if there is an error
        /*
        if (alright) {
          synchronized(this) {
            if (alright) {
              // Failure driven mode.
              try {
                wait(); // wake up only if there is an error.
              } catch(InterruptedException ignore) {
              }
            }
          }
        }
  
        // The "alright" flag must be false here.
        // Simply presume we can fix all the errors until proven otherwise.
        synchronized(this) {
          alright = true;
        }
        */
  
        p("cache failover running.");
  
        // there is no active listener
        if( !alright ) {
  
          // reset listener id for reidentification by new remote
          // RemoteCacheInfo.listenerId = 0;
          // this may not work, we may need to have unique listener ids per failover
  
          // Monitor each RemoteCacheManager instance one after the other.
          // Each RemoteCacheManager corresponds to one remote connection.
          String[] failovers = facade.rca.getFailovers();
          int fidx = facade.rca.getFailoverIndex();
          p( "fidx = " + fidx + " failovers.length = " + failovers.length );
          int i=fidx+1;
          p( "i = " + i );
          for ( ; i < failovers.length; i++) {
            p( "i = " + i );
            String server = failovers[i];
  
            RemoteCacheAttributes rca = null;
            try {
  
              rca = (RemoteCacheAttributes)facade.rca.copy();
              rca.setRemoteHost( server.substring( 0,server.indexOf(":") ) );
              rca.setRemotePort( Integer.parseInt(server.substring( 
server.indexOf(":") +1)) );
              RemoteCacheManager rcm = RemoteCacheManager.getInstance( rca );
              p( "RemoteCacheAttributes for failover = " + rca.toString() );
              // add a listener if there are none, need to tell rca what number it is 
at
              ICache ic = rcm.getCache( rca.getCacheName() );
              if ( ic != null ) {
                if ( ic.getStatus() == ic.STATUS_ALIVE ) {
                  // may need to do this more gracefully
                  p( "reseting no wait" );
                  facade.noWaits = new RemoteCacheNoWait[1];
                  facade.noWaits[0] = (RemoteCacheNoWait)ic;
                  facade.rca.setFailoverIndex(i);
  
                  synchronized(this) {
                    p( "setting ALRIGHT to true, moving to Primary Recovery Mode" );
                    alright = true;
                    p( "CONNECTED to " + rca.getRemoteHost() + ":" + 
rca.getRemotePort() + "\n\n" );
                  }
  
                }
              } else {
                //p( "noWait is null" );
              }
  
            } catch (Exception ex) {
              bad();
              p( "FAILED to connect to " + rca.getRemoteHost() + ":" + 
rca.getRemotePort() );
              // Problem encountered in fixing the caches managed by a 
RemoteCacheManager instance.
              // Soldier on to the next RemoteCacheManager instance.
              log.error(ex.toString());
            }
          }
  
        } // end if !alright
        else {
          p( "ALRIGHT is true --  failover runner is in primary recovery mode" );
          log.warn( "ALRIGHT is true --  failover runner is in primary recovery mode" 
);
        }
  
        /////////////////////////////////////////
        //try to move back to the primary
        String[] failovers = facade.rca.getFailovers();
        String server = failovers[0];
        try {
  
          RemoteCacheAttributes rca = (RemoteCacheAttributes)facade.rca.copy();
          rca.setRemoteHost( server.substring( 0,server.indexOf(":") ) );
          rca.setRemotePort( Integer.parseInt(server.substring( server.indexOf(":") 
+1)) );
          RemoteCacheManager rcm = RemoteCacheManager.getInstance( rca );
          // add a listener if there are none, need to tell rca what number it is at
          ICache ic = rcm.getCache( rca.getCacheName() );
          if ( ic != null ) {
            if ( ic.getStatus() == ic.STATUS_ALIVE ) {
              // may need to do this more gracefully
              p( "reseting no wait to PRIMARY" );
              facade.noWaits = new RemoteCacheNoWait[1];
              facade.noWaits[0] = (RemoteCacheNoWait)ic;
              facade.rca.setFailoverIndex(0);
              //return;
            }
          } else {
            //p( "noWait is null" );
          }
  
        } catch (Exception ex) {
          log.error(ex);
        }
  
  
        // Time driven mode: sleep between each round of recovery attempt.
        try {
          p("cache failover runner sleeping for " + idlePeriod);
          Thread.currentThread().sleep(idlePeriod);
        } catch (InterruptedException ex) {
        // ignore;
        }
  
      // try to bring the listener back to the primary
      } while ( facade.rca.getFailoverIndex() > 0 );
  
      p( "exiting failover runner" );
      return;
  
    }
  
  
    /** Sets the "alright" flag to false in a critial section. */
    private void bad() {
      if (alright) {
        synchronized(this) {
          alright = false;
        }
      }
    }
    private void p(String s) {
      if ( log.logLevel >= log.DEBUG ) {
        log.debug( s );
      }
      System.out.println("RemoteCacheFailoveRunner:" + s);
    }
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheInfo.java
  
  Index: RemoteCacheInfo.java
  ===================================================================
  package  org.apache.stratum.jcs.auxiliary.remote;
  
  /*
   *  This won't work when registered to mulitple remotes, will use a variable int he 
rcm
   */
  /** A shared static variable holder for the remote cache */
  public class RemoteCacheInfo {
  
    // shouldn't be instantiated
    private RemoteCacheInfo(){}
  
    /** Shouldn't be used till after reconneting, after setting = thread safe
     *  Used to identify a client, so we can run multiple clients off one host.
     *  Need since there is no way to identify a client other than by host in rmi.
     */
    protected static byte listenerId = 0;
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheListener.java
  
  Index: RemoteCacheListener.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote;
  
  //////////////////////////////////
  import  java.io.*;
  import  java.rmi.*;
  import  java.rmi.registry.*;
  import  java.rmi.server.*;
  import  java.util.*;
  import  java.sql.*;
  
  /////////////////////////////////////
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.engine.control.*;
  import org.apache.stratum.jcs.engine.group.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.access.*;
  import org.apache.stratum.jcs.auxiliary.disk.*;
  import org.apache.stratum.jcs.auxiliary.lateral.*;
  import org.apache.stratum.jcs.auxiliary.remote.*;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  
  // remove
  import org.apache.stratum.jcs.utils.log.*;
  
  ///////////////////////////////////////////////////////////////////////
  public class RemoteCacheListener implements IRemoteCacheListener, 
IRemoteCacheConstants, Serializable {
  
    protected static final boolean debug = false;//true;
    protected static final boolean debugcmd = false;//true;
  
    protected static transient Logger log;
  
    protected static transient ICompositeCacheManager cacheMgr;
  
    protected static IRemoteCacheListener instance;
    protected IRemoteCacheAttributes irca;
  
    // TODO: change this log level
    protected static final boolean debugactivity = true;
    protected int puts = 0;
    protected int removes = 0;
  
    /////////////////////////////////////////////////////////////
    /**
     *  Only need one since it does work for all regions,
     *  just reference by multiple region names.
     */
    protected RemoteCacheListener ( IRemoteCacheAttributes irca ) {
  
      this.irca = irca;
  
      log = LoggerManager.getLogger( "remote_remotecachemanager" );
  
      // may need to add to ICacheManager interface to handle
      // the source arument extended update and remove methods
  
      // causes circular reference, unfortunate, becasue the
      // the overhead is higer
      // will need to pass a refernce thru
      //cacheMgr = CacheManagerFactory.getInstance();
  
      // Export this remote object to make it available to receive incoming calls,
      // using an anonymous port.
      try {
        if ( irca.getLocalPort() != 0 ) {
          UnicastRemoteObject.exportObject(this, irca.getLocalPort() );
        } else {
          UnicastRemoteObject.exportObject(this);
        }
      } catch (RemoteException ex) {
        log.error(ex);
        throw  new IllegalStateException(ex.getMessage());
      }
  
    }
  
    /** let the remote cache set a listener_id.
     *  Since there is only one listerenr for all the regions
     *  and every region gets registered? the id shouldn't be set
     *  if it isn't zero.  If it is we assume that it is a reconnect.
     */
    public void setListenerId( byte id ) throws IOException {
      RemoteCacheInfo.listenerId = id;
      if ( debugcmd ) {
        p( "set listenerId = " +  id );
      }
    }
    public byte getListenerId( ) throws IOException {
  
      // set the manager since we are in use
      //getCacheManager();
  
      //p( "get listenerId" );
      if ( debugcmd ) {
        p( "get listenerId = " +  RemoteCacheInfo.listenerId );
      }
      return RemoteCacheInfo.listenerId;
    }
  
    ////////////////////////////////////////////////////
    public int getRemoteType( ) throws IOException {
      if ( debugcmd ) {
        p( "getRemoteType = " +  irca.getRemoteType() );
      }
      return irca.getRemoteType();
    }
  
     ///////////////////////////////////////////////////////////////////////////
     public static IRemoteCacheListener getInstance( IRemoteCacheAttributes irca )  {
       //throws IOException, NotBoundException
        if (instance == null) {
          synchronized(RemoteCacheListener.class) {
            if (instance == null) {
             instance = new RemoteCacheListener( irca );
            }
          }
        }
        //instance.incrementClients();
        return instance;
     }
  
  
    /////////////////////////////////////////////////////////////////////////////
    //////////////////////////// implements the IRemoteCacheListener interface. 
//////////////
    /**
     * Just remove the element since it has been updated elsewhere
     * cd should be incomplete for faster transmission.
     * We don't want to pass data only invalidation.
     * The next time it is used the local cache will get
     * the new version from the remote store
     */
    public void handlePut (ICacheElement cb) throws IOException {
  
      if ( irca.getRemoveUponRemotePut() ) {
      // for now
      //if ( true ) {
        if ( debugcmd ) {
          p( "PUTTING ELEMENT FROM REMOTE, (  invalidating ) " );
        }
  
        handleRemove(cb.getCacheName(), cb.getKey());
  
      } else {
        if ( debugcmd ) {
          p( "PUTTING ELEMENT FROM REMOTE, ( updating ) " );
          p( "cb = " + cb );
        }
        if ( debugactivity ) {
          puts++;
          if ( puts % 100 == 0 ) {
            p( "puts = " + puts );
          }
        }
  
        getCacheManager();
        ICompositeCache cache = (ICompositeCache)cacheMgr.getCache(cb.getCacheName());
        cache.update( cb, false );
  
        /*
        ICompositeCache cache = 
(ICompositeCache)cacheMgr.getCache(irca.getCacheName());
        // copy the element to avoid the remote reference, may need to do a deep copy
        ICacheElement ce = new CacheElement( cb.getCacheName(), cb.getKey(), 
cb.getVal() );
        ce.setAttributes( cb.getAttributes().copy() );
        try {
          cache.update( ce, false );
        } catch( Exception e ) {
          // strange marshalling exception encountered
          log.error( e );
        }
        */
      }
      return;
    }
  
    ////////////////////////////////////////////////////
    public void handleRemove (String cacheName, Serializable key) throws IOException {
      if ( debugactivity ) {
        removes++;
        if ( removes % 100 == 0 ) {
          p( "removes = " + removes );
        }
      }
  
      if (debug) {
        log.debug("handleRemove> cacheName=" + cacheName + ", key=" + key);
      }
      if ( debugcmd ) {
        p("handleRemove> cacheName=" + cacheName + ", key=" + key);
      }
  
      getCacheManager();
      // interface limitation here
  
      Cache cache = (Cache)cacheMgr.getCache(cacheName);
      cache.remove(key, cache.REMOTE_INVOKATION);
    }
  
    //////////////////////////////////////////////////
    public void handleRemoveAll (String cacheName) throws IOException {
      if (debug) {
        log.debug("handleRemoveAll> cacheName=" + cacheName);
      }
      getCacheManager();
      ICache cache = cacheMgr.getCache(cacheName);
      cache.removeAll();
    }
  
    ///////////////////////////////////////////////////
    public void handleDispose (String cacheName) throws IOException {
      if (debug) {
        log.debug("handleDispose> cacheName=" + cacheName);
      }
      CompositeCacheManager cm = (CompositeCacheManager)cacheMgr;
      cm.freeCache(cacheName, Cache.REMOTE_INVOKATION);
    }
  
  
    ////////////////////////////////////////////////
    // override for new funcitonality
    protected void getCacheManager() {
      if ( cacheMgr == null ) {
        cacheMgr = (ICompositeCacheManager)CacheManagerFactory.getInstance();
        p( "had to get cacheMgr" );
        if ( debugcmd ) {
          p( "cacheMgr = " + cacheMgr );
        }
      } else {
        if ( debugcmd ) {
          p( "already got cacheMgr = " + cacheMgr );
        }
      }
    }
  
    //////////////////////////////////////////////////
    public static void p( String s ) {
      if ( log.logLevel >= log.DEBUG ) {
        log.debug(  "RemoteCacheListener: " + s );
      }
      System.out.println( "RemoteCacheListener: " + s );
    }
  
  } // end class
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheManager.java
  
  Index: RemoteCacheManager.java
  ===================================================================
  package  org.apache.stratum.jcs.auxiliary.remote;
  
  import  java.io.*;
  import  java.net.*;
  import  java.rmi.*;
  import  java.rmi.registry.*;
  import  java.util.*;
  import  java.sql.*;
  
  import  org.apache.stratum.jcs.engine.*;
  import  org.apache.stratum.jcs.engine.behavior.*;
  import  org.apache.stratum.jcs.auxiliary.behavior.*;
  import  org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  import  org.apache.stratum.jcs.auxiliary.remote.group.*;
  
  import org.apache.stratum.jcs.utils.log.*;
  
  /**
   * An instance of RemoteCacheManager corresponds to one remote connection of a
   * specific host and port.
   * All RemoteCacheManager instances are monitored by the singleton
   * RemoteCacheMonitor monitoring daemon for error detection and recovery.
   */
  public class RemoteCacheManager implements IAuxiliaryCacheManager {
  
    // Contains mappings of Location instance to RemoteCacheManager instance.
    static final Map instances = new HashMap();
    private static RemoteCacheMonitor monitor;
    private static boolean debug = false;//true;         //true;
  
    private int clients;
    private Logger log;
  
    // Contains instances of RemoteCacheNoWait managed by an RemoteCacheManager 
instance.
    final Map caches = new HashMap();
    final String host;
    final int port;
    final String service;
  
    private IRemoteCacheAttributes irca;
  
  
    /**
     * Handle to the remote cache service; or a zombie handle if failed to connect.
     */
    private IRemoteCacheService remoteService;
  
    /**
     * Wrapper of the remote cache watch service;
     * or wrapper of a zombie service if failed to connect.
     */
    private RemoteCacheWatchRepairable remoteWatch;
  
    /**
     * Constructs an instance to with the given remote connection parameters.
     * If the connection cannot be made, "zombie" services will be temporarily
     * used until a successful re-connection is made by the monitoring daemon.
     */
    private RemoteCacheManager (String host, int port, String service) {
      this.host = host;
      this.port = port;
      this.service = service;
      log = LoggerManager.getLogger(this);
      String registry = "//" + host + ":" + port + "/" + service;
      if (debug) {
        p("looking up server " + registry);
      }
      try {
        Object obj = Naming.lookup(registry);
        if (debug) {
          p("server found");
        }
        // Successful connection to the remote server.
        remoteService = (IRemoteCacheService)obj;
        remoteWatch = new RemoteCacheWatchRepairable();
        remoteWatch.setCacheWatch((IRemoteCacheObserver)obj);
      } catch (Exception ex) {
        // Failed to connect to the remote server.
        // Configure this RemoteCacheManager instance to use the "zombie" services.
        log.error(ex.getMessage());
        remoteService = new ZombieRemoteCacheService();
        remoteWatch = new RemoteCacheWatchRepairable();
        remoteWatch.setCacheWatch(new ZombieRemoteCacheWatch());
        // Notify the cache monitor about the error, and kick off the recovery process.
        RemoteCacheMonitor.getInstance().notifyError();
      }
    }
  
  
    ////////////////////////////////////////
    public IRemoteCacheAttributes getDefaultCattr() {
      return this.irca;
    }
  
    /** Adds the remote cache listener to the underlying cache-watch service. */
    public void addRemoteCacheListener( IRemoteCacheAttributes cattr, 
IRemoteCacheListener listener) throws IOException {
      synchronized (caches) {
        remoteWatch.addCacheListener(cattr.getCacheName(), listener);
      }
      return;
    }
  
  
    /**
     * Returns an instance of RemoteCacheManager for the given connection parameters.
     * Also starts up the monitoring daemon, if not already started.
     * If the connection cannot be established, zombie objects will be used
     * for future recovery purposes.
     * @param host host of the registry where the service lookup is to be made.
     * @parma port port of the registry.
     */
    public static RemoteCacheManager getInstance ( IRemoteCacheAttributes cattr) {
  
      String host = cattr.getRemoteHost();
      int port = cattr.getRemotePort();
      String service = cattr.getRemoteServiceName();
      if (host == null)
        host = "";
      if (port < 1024) {
        port = Registry.REGISTRY_PORT;
      }
      Location loc = new Location(host, port);
  
      RemoteCacheManager ins = (RemoteCacheManager)instances.get(loc);
      if (ins == null) {
        synchronized (instances) {
          ins = (RemoteCacheManager)instances.get(loc);
          if (ins == null) {
            // cahnge to use cattr and to set defaults
            ins = new RemoteCacheManager(host, port, service);
            ins.irca = cattr;
            instances.put(loc, ins);
          }
        }
      }
      if (debug) {
        ins.log.logIt("Manager stats : " + ins.getStats() + "<br> -- in 
getInstance()");
      }
      ins.clients++;
      // Fires up the monitoring daemon.
      if (monitor == null) {
        monitor = RemoteCacheMonitor.getInstance();
        // If the returned monitor is null, it means it's already started elsewhere.
        if (monitor != null) {
          Thread t = new Thread(monitor);
          t.setDaemon(true);
          t.start();
        }
      }
      return  ins;
    }
  
  
    /** Returns a remote cache for the given cache name. */
    /** Returns a remote cache for the given cache name. */
    public ICache getCache ( String cacheName ) {
      IRemoteCacheAttributes ca = (IRemoteCacheAttributes)irca.copy();
      ca.setCacheName( cacheName );
      return getCache( ca );
    }
    public ICache getCache ( IRemoteCacheAttributes cattr ) {
      RemoteCacheNoWait c = null;
      synchronized (caches) {
        c = (RemoteCacheNoWait)caches.get( cattr.getCacheName() );
        if (c == null) {
          c = new RemoteCacheNoWait(new RemoteCache(cattr, remoteService));
          caches.put(cattr.getCacheName(), c);
        }
      }
      if (debug) {
        log.logIt("Manager stats : " + getStats());
      }
      //if ( irca.getUseRemote() ) {
        try {
          // Remote cache manager can handle this by gettign the type formt he listener
          //if ( cattr.getRemoteType() == cattr.CLUSTER ) {
          //  addRemoteCacheListener( cattr, RemoteGroupCacheListener.getInstance( 
cattr ) );
          //} else
          //if ( cattr.getRemoteType() == cattr.LOCAL ) {
            addRemoteCacheListener( cattr, RemoteGroupCacheListener.getInstance( cattr 
) );
          //}
        } catch( IOException ioe ) {
          log.error( ioe.getMessage() );
        } catch( Exception e ) {
          log.error( e.getMessage() );
        }
      //}
      return  c;
    }
  
    /////////////////////////////////////////////////////////
    public void freeCache (String name) throws IOException {
      ICache c = null;
  
      synchronized(caches) {
        c = (ICache)caches.get(name);
      }
      if (c != null) {
        c.dispose();
      }
    }
  
    /////////////////////////////////////////////////////////////////////
    // Don't care if there is a concurrency failure ?
    public String getStats () {
      StringBuffer stats = new StringBuffer();
      Iterator allCaches = caches.values().iterator();
      while (allCaches.hasNext()) {
        ICache c = (ICache)allCaches.next();
        if (c != null) {
          stats.append("<br>&nbsp;&nbsp;&nbsp;" + c.getStats());
        }
      }
      return  stats.toString();
    }
  
    /////////////////////////////////////////////////////////////////
    public void release () {
      // Wait until called by the last client
      if (--clients != 0) {
        return;
      }
      synchronized (caches) {
        Iterator allCaches = caches.values().iterator();
        while (allCaches.hasNext()) {
          ICache c = (ICache)allCaches.next();
          if (c != null) {
            try {
              c.dispose();
            } catch (IOException ex) {
              ex.printStackTrace();
              log.logEx(ex);
            }
          }
        }
      }
    }             //end release()
  
    /** Fixes up all the caches managed by this cache manager. */
    public void fixCaches (IRemoteCacheService remoteService, IRemoteCacheObserver 
remoteWatch) {
      synchronized (caches) {
        this.remoteService = remoteService;
        this.remoteWatch.setCacheWatch(remoteWatch);
        for (Iterator en = caches.values().iterator(); en.hasNext();) {
          RemoteCacheNoWait cache = (RemoteCacheNoWait)en.next();
          cache.fixCache(this.remoteService);
        }
      }
    }
  
    /////////////////////////////////
    private void p (String s) {
      log.debug("RemoteCacheManager:" + s);
    }
  
    ///////////////////////////////////////
    public int getCacheType() {
      return REMOTE_CACHE;
    }
  
  
    //////////////////////////////////////////////////////////////////////
    /** Location of the RMI registry. */
    private static final class Location {
      public final String host;
      public final int port;
  
      public Location (String host, int port) {
        this.host = host;
        this.port = port;
      }
  
      public boolean equals (Object obj) {
        if (obj == this) {
          return  true;
        }
        if (obj == null || !(obj instanceof Location))
          return  false;
        Location l = (Location)obj;
        if (this.host == null && l.host != null)
          return  false;
        return  host.equals(l.host) && port == l.port;
      }
  
      //////////////////////////////////////////
      public int hashCode () {
        return  host == null ? port : host.hashCode() ^ port;
      }
  
    }
  
  }
  
  
  
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheMonitor.java
  
  Index: RemoteCacheMonitor.java
  ===================================================================
  package  org.apache.stratum.jcs.auxiliary.remote;
  
  import org.apache.stratum.jcs.utils.log.*;
  import  java.util.*;
  
  /**
   * Used to monitor and repair any failed connection for the remote cache service.
   * By default the monitor operates in a failure driven mode.  That is, it goes into 
a wait
   * state until there is an error.
   *
   * TODO consider moving this into an active monitoring mode.
   *
   * Upon the notification of a connection error, the monitor changes to operate
   * in a time driven mode.  That is, it attempts to recover the connections on a 
periodic basis.
   * When all failed connections are restored, it changes back to the failure driven 
mode.
   */
  public class RemoteCacheMonitor implements Runnable {
  
    private static RemoteCacheMonitor instance;
    private static long idlePeriod = 30*1000;            // minimum 30 seconds.
    //private static long idlePeriod = 3*1000; // for debugging.
    private transient Logger log = LoggerManager.getInstance().getLogger(this);
  
    // Must make sure RemoteCacheMonitor is started before any remote error can be 
detected!
    private boolean alright=true;
  
    static final int TIME = 0;
    static final int ERROR = 1;
    static int mode = ERROR;
  
    /** Configures the idle period between repairs. */
    public static void setIdlePeriod(long idlePeriod) {
      if (idlePeriod > RemoteCacheMonitor.idlePeriod) {
        RemoteCacheMonitor.idlePeriod = idlePeriod;
      }
    }
    private RemoteCacheMonitor() {}
  
    /**
     * Returns the singleton instance;
     */
    static RemoteCacheMonitor getInstance() {
      if (instance == null) {
        synchronized(RemoteCacheMonitor.class) {
          if (instance == null) {
            return instance=new RemoteCacheMonitor();
          }
        }
      }
      return instance;
    }
    /**
     * Notifies the cache monitor that an error occurred,
     * and kicks off the error recovery process.
     */
    public void notifyError() {
      p( "Notified of an error." );
      bad();
      synchronized(this) {
        notify();
      }
    }
    // Run forever.
    // Avoid the use of any synchronization in the process of monitoring for 
performance reason.
    // If exception is thrown owing to synchronization,
    // just skip the monitoring until the next round.
    public void run () {
      p( "Monitoring daemon started" );
      do {
  
        if ( mode == ERROR ) {
          if (alright) {
            synchronized(this) {
              if (alright) {
                // make this configurable, comment out wait to enter time driven mode
                // Failure driven mode.
                try {
                  log.debug("FAILURE DRIVEN MODE: cache monitor waiting for error" );
                  wait(); // wake up only if there is an error.
                } catch(InterruptedException ignore) {
                }
              }
            }
          }
        } else {
          log.debug("TIME DRIVEN MODE: cache monitor sleeping for " + idlePeriod);
          // Time driven mode: sleep between each round of recovery attempt.
            // will need to test not just check status
        }
  
          try {
            Thread.currentThread().sleep(idlePeriod);
          } catch (InterruptedException ex) {
          // ignore;
          }
  
        // The "alright" flag must be false here.
        // Simply presume we can fix all the errors until proven otherwise.
        synchronized(this) {
          alright = true;
        }
        //p("cache monitor running.");
        // Monitor each RemoteCacheManager instance one after the other.
        // Each RemoteCacheManager corresponds to one remote connection.
        for (Iterator itr = RemoteCacheManager.instances.values().iterator(); 
itr.hasNext();) {
          RemoteCacheManager mgr = (RemoteCacheManager)itr.next();
          try {
            // If any cache is in error, it strongly suggests all caches managed by the
            // same RmicCacheManager instance are in error.  So we fix them once and 
for all.
            for (Iterator itr2 = mgr.caches.values().iterator(); itr2.hasNext();) {
              if (itr2.hasNext()) {
                RemoteCacheNoWait c = (RemoteCacheNoWait)itr2.next();
                if (c.getStatus() == c.STATUS_ERROR) {
                  RemoteCacheRestore repairer = new RemoteCacheRestore(mgr);
                  // If we can't fix them, just skip and re-try in the next round.
                  if (repairer.canFix()) {
                    repairer.fix();
                  }
                  else {
                    bad();
                  }
                  break;
                }
              }
            }
          } catch (Exception ex) {
            bad();
            // Problem encountered in fixing the caches managed by a 
RemoteCacheManager instance.
            // Soldier on to the next RemoteCacheManager instance.
            log.error(ex);
          }
        }
      } while (true);
    }
    /** Sets the "alright" flag to false in a critial section. */
    private void bad() {
      if (alright) {
        synchronized(this) {
          alright = false;
        }
      }
    }
    private void p(String s) {
      if ( log.logLevel >= log.DEBUG ) {
        log.debug( s );
      }
      System.out.println("RemoteCacheMonitor:" + s);
    }
  }
  
  
  
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheNoWait.java
  
  Index: RemoteCacheNoWait.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote;
  
  import org.apache.stratum.jcs.engine.control.Cache;
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  
  import org.apache.stratum.jcs.utils.log.*;
  
  import java.io.*;
  import java.rmi.*;
  
  /**
   * Used to queue up update requests to the underlying cache.
   * These requests will be processed in their order of arrival
   * via the cache event queue processor.
   */
  public class RemoteCacheNoWait implements ICache {
  
    private final RemoteCache cache;
    private ICacheEventQueue q;
    private transient Logger log = LoggerManager.getInstance().getLogger(this);
  
    private String source_id = 
"org.apache.stratum.jcs.auxiliary.remote.RemoteCacheNoWait";
  
    ///////////////////////////////////////////////////
    public Serializable getSourceId() {
      return this.source_id;
    }
  
  
    /**
     * Constructs with the given remote cache,
     * and fires up an event queue for aysnchronous processing.
     */
    public RemoteCacheNoWait(RemoteCache cache) {
        this.cache = cache;
        this.q = new CacheEventQueue(new CacheAdaptor(cache), 
RemoteCacheInfo.listenerId, cache.getCacheName());
        if (cache.getStatus() == cache.STATUS_ERROR)
          q.destroy();
    }
  
    /** Adds a put request to the remote cache. */
    public void put(Serializable key, Serializable value ) throws IOException {
      put( key, value, null );
    }
    public void put(Serializable key, Serializable value,  Attributes attr ) throws 
IOException {
      try {
        CacheElement ce = new CacheElement( cache.getCacheName(), key, value );
        ce.setAttributes( attr );
        update( ce );
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
        throw ex;
      }
    }
    public void update( ICacheElement ce ) throws IOException {
      try {
        q.addPutEvent( ce );
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
        throw ex;
      }
    }
  
    /** Synchronously reads from the remote cache. */
    public Serializable get(Serializable key) throws IOException{
      return get( key, true );
    }
    public Serializable get(Serializable key, boolean container ) throws IOException{
      try {
        return cache.get(key);
      } catch(UnmarshalException ue) {
        p("Retrying the get owing to UnmarshalException...");
        try {
          return cache.get(key);
        } catch(IOException ex) {
          p("Failed in retrying the get for the second time.");
          q.destroy();
        }
      } catch(IOException ex) {
        q.destroy();
        throw ex;
      }
      return null;
    }
    /** Adds a remove request to the remote cache. */
    public boolean remove(Serializable key) throws IOException{
      try {
        q.addRemoveEvent(key);
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
        throw ex;
      }
      return false;
    }
    /** Adds a removeAll request to the remote cache. */
    public void removeAll() throws IOException{
      try {
        q.addRemoveAllEvent();
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
        throw ex;
      }
    }
    /** Adds a dispose request to the remote cache. */
    public void dispose() {
      try {
        q.addDisposeEvent();
      } catch(IOException ex) {
        log.error(ex);
        q.destroy();
      }
    }
    /** No remote invokation. */
    public String getStats() {
      return cache.getStats();
    }
    /** No remote invokation. */
    public int getSize() {
      return cache.getSize();
    }
    /** No remote invokation. */
    public int getCacheType() {
      return ICacheType.REMOTE_CACHE;
      //return cache.getCacheType();
    }
    /**
     * Returns the asyn cache status.
     * An error status indicates either the remote connection is not available,
     * or the asyn queue has been unexpectedly destroyed.
     * No remote invokation.
     */
    public int getStatus() {
      return q.isAlive() ? cache.getStatus() : cache.STATUS_ERROR;
    }
    public String getCacheName() {
      return cache.getCacheName();
    }
    /**
     * Replaces the remote cache service handle with the given handle and
     * reset the event queue by starting up a new instance.
     */
    public void fixCache(IRemoteCacheService remote) {
      cache.fixCache(remote);
      resetEventQ();
      return;
    }
    /** Resets the event q by first destroying the existing one and starting up new 
one. */
    public void resetEventQ() {
      if (q.isAlive()) {
        q.destroy();
      }
      this.q = new CacheEventQueue(new CacheAdaptor(cache), 
RemoteCacheInfo.listenerId, cache.getCacheName());
    }
    public String toString() {
      return "RemoteCacheNoWait: " + cache.toString();
    }
    private void p(String s) {
      System.out.println("RemoteCacheNoWait:" + s);
    }
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheNoWaitFacade.java
  
  Index: RemoteCacheNoWaitFacade.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote;
  
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.engine.control.*;
  
  import org.apache.stratum.jcs.utils.log.*;
  
  import java.io.*;
  import java.rmi.*;
  
  /**
   *  Used to provide access to multiple services under nowait protection.
   *  factory should construct NoWaitFacade to give
   *  to the composite cache out of caches it constructs from the varies
   *  manager to lateral services.
   */
  public class RemoteCacheNoWaitFacade implements ICache {
  
    private static final boolean debug = false;// true;
    private static final boolean debugF = true;
  
    public RemoteCacheNoWait[] noWaits;
    private transient Logger log = LoggerManager.getInstance().getLogger(this);
    private String source_id = 
"org.apache.stratum.jcs.auxiliary.remote.RemoteCacheNoWaitFacade";
  
    private String cacheName;
  
    // holds failover and cluster information
    RemoteCacheAttributes rca;
  
    ///////////////////////////////////////////////////
    public RemoteCacheAttributes getRemoteCacheAttributes() {
      return rca;
    }
    public void setRemoteCacheAttributes( RemoteCacheAttributes rca ) {
      this.rca = rca;
    }
  
    ///////////////////////////////////////////////////
    public Serializable getSourceId() {
      return this.source_id;
    }
  
    /**
     * Constructs with the given remote cache,
     * and fires events to any listeners.
     */
    //public RemoteCacheNoWaitFacade(RemoteCacheNoWait[] noWaits, String cacheName) {
    public RemoteCacheNoWaitFacade(RemoteCacheNoWait[] noWaits, RemoteCacheAttributes 
rca) {
      if ( debug ) {
        p( "CONSTRUCTING NO WAIT FACADE" );
      }
      this.noWaits = noWaits;
      this.rca = rca;
      this.cacheName = rca.getCacheName();
    }
  
    /** Adds a put request to the lateral cache. */
    public void put(Serializable key, Serializable value ) throws IOException {
      put( key, value, null );
    }
    public void put(Serializable key, Serializable value,  Attributes attr ) throws 
IOException {
      try {
        CacheElement ce = new CacheElement( cacheName, key, value );
        ce.setAttributes( attr );
        update( ce );
      } catch(Exception ex) {
        log.error(ex);
      }
    }
    public void update( ICacheElement ce ) throws IOException {
      if ( debug ) {
        p( "updating through cache facade, noWaits.length = " + noWaits.length );
      }
      int i=0;
      try {
        for( ; i < noWaits.length; i++ ) {
          noWaits[i].update( ce );
          // an initial move into a zombie will lock this to primary
          // recovery.  will not discover other servers until primary reconnect
          // and subsequent error
        }
      } catch(Exception ex) {
        log.error(ex);
        // can handle failover here?  Is it safe to try the others?
        // check to see it the noWait is now a zombie
        // if it is a zombie, then move to the next in the failover list
        // will need to keep them in order or a count
        failover( i ); // should start a failover thread
        // should probably only failover if there is only one in the noWait list
        // should start a background thread to set the original as the primary
        // if we are in failover state
      }
    }
  
    /** Synchronously reads from the lateral cache. */
    public Serializable get(Serializable key) {
      return get( key, true );
    }
    public Serializable get(Serializable key, boolean container ) {
      for( int i=0; i < noWaits.length; i++ ) {
        try {
          Object obj = noWaits[i].get( key, container );
          if ( obj != null ) {
            return (Serializable)obj;
          }
        } catch(Exception ex) {
          p("Failed to get.");
        }
        return null;
      }
      return null;
    }
    /** Adds a remove request to the lateral cache. */
    public boolean remove(Serializable key) {
      try {
        for( int i=0; i < noWaits.length; i++ ) {
          noWaits[i].remove( key );
        }
      } catch(Exception ex) {
        log.error(ex);
      }
      return false;
    }
    /** Adds a removeAll request to the lateral cache. */
    public void removeAll() {
      try {
        for( int i=0; i < noWaits.length; i++ ) {
          noWaits[i].removeAll();
        }
      } catch(Exception ex) {
        log.error(ex);
      }
    }
    /** Adds a dispose request to the lateral cache. */
    public void dispose() {
      try {
        for( int i=0; i < noWaits.length; i++ ) {
          noWaits[i].dispose();
        }
      } catch(Exception ex) {
        log.error(ex);
      }
    }
    /** No lateral invokation. */
    public String getStats() {
      return "";//cache.getStats();
    }
    /** No lateral invokation. */
    public int getSize() {
      return 0; //cache.getSize();
    }
  
    /////////////////////////////////////////////
    public int getCacheType() {
      return ICacheType.REMOTE_CACHE;
    }
  
    public String getCacheName() {
      return ""; //cache.getCacheName();
    }
  
    // need to do something with this
    public int getStatus() {
      return 0;//q.isAlive() ? cache.getStatus() : cache.STATUS_ERROR;
    }
  
    ////////////////////////////////////////////////////////
    public String toString() {
      return "RemoteCacheNoWaitFacade: " + cacheName + ", rca = " + rca;
    }
    private void p(String s) {
      System.out.println("RemoteCacheNoWaitFacade:" + s);
    }
  
    /////////////////////////////////////////
    protected void failover( int i ) {
  
      if ( debugF ) {
        p( "in failover for " + i );
      }
      //if ( noWaits.length == 1 ) {
      if ( rca.getRemoteType() == rca.LOCAL ) {
        if ( noWaits[i].getStatus() == STATUS_ERROR ) {
          // start failover, primary recovery process
          RemoteCacheFailoverRunner runner = new RemoteCacheFailoverRunner( this );
          // If the returned monitor is null, it means it's already started elsewhere.
          if (runner != null) {
            runner.notifyError();
            Thread t = new Thread(runner);
            t.setDaemon(true);
            t.start();
          }
        } else {
          if ( debugF ) {
            p( "the noWait is not in error" );
          }
          log.info("the noWait is not in error");
        }
      }
   }
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheRestore.java
  
  Index: RemoteCacheRestore.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote;
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.behavior.*;
  import org.apache.stratum.jcs.auxiliary.remote.*;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  
  import org.apache.stratum.jcs.utils.log.*;
  
  import java.rmi.*;
  import java.rmi.registry.*;
  
  /**
   * Used to repair the remote caches managed by the associated instance of 
RemoteCacheManager.
   *
   *  When there is an error the monitor kicks off.  The Failover runner starts
   *  looks for a manager with a connection to a remote cache that is not in error.
   *  If a manager's connection to a remote cache is found to be in error, the
   *  restorer kicks off and tries to reconnect.  When it is succesful, the status
   *  of the manager changes.  When the failover runner finds that the primary is
   *  in good shape, it will switch back.
   *
   **/
  public class RemoteCacheRestore implements ICacheRestore {
  
    private Logger log = LoggerManager.getLogger(this);
  
    private boolean debug = true;
    private final RemoteCacheManager rcm;
    //private final IAuxiliaryCacheManager rcm;
    private boolean canFix = true;
  
    private Object remoteObj;
  
    /** Constructs with the given instance of RemoteCacheManager. */
    public RemoteCacheRestore(RemoteCacheManager rcm) {
    //public RemoteCacheRestore(IAuxiliaryCacheManager rcm) {
      this.rcm = rcm;
    }
    /**
     * Returns true if the connection to the remote host for the corresponding cache 
manager
     * can be successfully re-established.
     */
    public boolean canFix() {
      if (!canFix)
        return canFix;
      String registry = "//" + rcm.host  + ":" + rcm.port + "/" + rcm.service;
      log.info("looking up server " + registry);
      try {
        remoteObj = Naming.lookup(registry);
        log.info("looking up server " + registry);
      } catch(Exception ex) {
        //log.error(ex, "host=" + rcm.host + "; port" + rcm.port + "; service=" + 
rcm.service );
        log.error("host=" + rcm.host + "; port" + rcm.port + "; service=" + 
rcm.service );
        canFix = false;
      }
      return canFix;
    }
    /** Fixes up all the caches managed by the associated cache manager. */
    public void fix() {
      if (!canFix) {
        return;
      }
      rcm.fixCaches((IRemoteCacheService)remoteObj, (IRemoteCacheObserver)remoteObj);
      String msg = "Remote connection to " + "//" + rcm.host  + ":" + rcm.port + "/" + 
rcm.service + " resumed.";
      log.info(msg);
      p(msg);
    }
    private void p(String s) {
      System.out.println("RemoteCacheRestore:"+s);
    }
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheWatchRepairable.java
  
  Index: RemoteCacheWatchRepairable.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote;
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  
  /** Same as CacheWatcherWrapper but implements the IRemoteCacheWatch interface. */
  public class RemoteCacheWatchRepairable extends CacheWatchRepairable implements 
IRemoteCacheObserver {
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteUtils.java
  
  Index: RemoteUtils.java
  ===================================================================
  
  package  org.apache.stratum.jcs.auxiliary.remote;
  
  import  java.rmi.*;
  import  java.rmi.registry.*;
  import  java.rmi.server.*;
  import  java.io.*;
  import  java.util.*;
  
  
  public class RemoteUtils {
    private static boolean debug = false;
  
    private RemoteUtils () {
    }
  
    /**
     * Creates and exports a registry on the specified port of the local host.
     */
    public static int createRegistry (int port) throws RemoteException {
      p("createRegistry> setting security manager");
      System.setSecurityManager(new RMISecurityManager());
      if (port < 1024) {
        port = Registry.REGISTRY_PORT;
      }
      p("createRegistry> creating registry");
      LocateRegistry.createRegistry(port);
      return  port;
    }
  
    public static Properties loadProps (String propFile) throws IOException {
      InputStream is = RemoteUtils.class.getResourceAsStream(propFile);
      Properties props = new Properties();
      try {
        props.load(is);
        if (debug) {
          p("props.size=" + props.size() + ", " + props);
        }
      } catch (Exception ex) {
      // ignore;
      } finally {
        if (is != null) {
          is.close();
        }
      }
      return  props;
    }
  
    private static void p (String s) {
      System.out.println("RmiUitls: " + s);
    }
  }
  
  
  
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/ZombieRemoteCacheService.java
  
  Index: ZombieRemoteCacheService.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote;
  
  import java.io.*;
  
  import org.apache.stratum.jcs.engine.behavior.ICacheService;
  import org.apache.stratum.jcs.engine.behavior.ICacheElement;
  import org.apache.stratum.jcs.engine.behavior.ICache;
  import org.apache.stratum.jcs.utils.reuse.*;
  import org.apache.stratum.jcs.engine.ZombieCacheService;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  
  public class ZombieRemoteCacheService extends ZombieCacheService implements 
IRemoteCacheService {
  
    public void update(ICacheElement item, byte listenerId) {}
    public void remove(String cacheName, Serializable key, byte listenerId) {}
    public void removeAll(String cacheName, byte listenerId) {}
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/ZombieRemoteCacheWatch.java
  
  Index: ZombieRemoteCacheWatch.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote;
  
  import org.apache.stratum.jcs.engine.ZombieCacheWatch;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  
  public class ZombieRemoteCacheWatch extends ZombieCacheWatch implements 
IRemoteCacheObserver {
  }
  
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to