asmuts      02/01/14 22:31:46

  Added:       src/java/org/apache/stratum/jcs/auxiliary/remote/server
                        RemoteCacheClusterFactory.java
                        RemoteCacheClusterManager.java
                        RemoteCacheClusterMonitor.java
                        RemoteCacheClusterRestore.java
                        RemoteCacheServer.java
                        RemoteCacheServerFactory.java
                        RemoteCacheServerInfo.java
                        RemoteCacheServerListener.java
  Log:
  remote cache server
  the cluster stuff is not used, but will be further developed
  
  Revision  Changes    Path
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/server/RemoteCacheClusterFactory.java
  
  Index: RemoteCacheClusterFactory.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote.server;
  
  /**
   * Title:
   * Description: Creates cluster nodes that connect remote caches.
   * Copyright:    Copyright (c) 2001
   * Company:
   * @author
   * @version 1.0
   */
  
   import java.util.*;
  
  //import org.apache.stratum.jcs.auxiliary.*;
  import org.apache.stratum.jcs.auxiliary.behavior.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.remote.*;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  import org.apache.stratum.jcs.utils.log.*;
  
  
  public class RemoteCacheClusterFactory implements IAuxiliaryCacheFactory {
  
    private static Logger log = LoggerManager.getLogger( RemoteCacheFactory.class );
  
    private static String name;
  
  
    ///////////////////////////////////
    public RemoteCacheClusterFactory() {
    }
  
  
     ///////////////////////////////////////////////////////////////
    /**
     * Interface method.  Allows classforname construction, making caches pluggable.
     */
    public ICache createCache( IAuxiliaryCacheAttributes iaca ) {
      ArrayList noWaits = new ArrayList();
  
      RemoteCacheAttributes rca = (RemoteCacheAttributes)iaca;
  
      // create "SYSTEM_CLUSTER" caches for potential use
      if ( rca.getCacheName() == null ) {
        rca.setCacheName( "SYSTEM_CLUSTER" );
      }
      StringTokenizer it = new StringTokenizer( rca.getClusterServers(), "," );
      while( it.hasMoreElements() ) {
        //String server = (String)it.next();
        String server = (String)it.nextElement();
        //p( "tcp server = " +  server );
        rca.setRemoteHost( server.substring( 0,server.indexOf(":") ) );
        rca.setRemotePort( Integer.parseInt(server.substring( server.indexOf(":") +1)) 
);
        RemoteCacheClusterManager rcm = RemoteCacheClusterManager.getInstance( rca );
        ICache ic = rcm.getCache( rca.getCacheName() );
        if ( ic != null ) {
          noWaits.add( ic );
        } else {
          //p( "noWait is null" );
        }
      }
  
      RemoteCacheNoWaitFacade rcnwf = new RemoteCacheNoWaitFacade( 
(RemoteCacheNoWait[])noWaits.toArray(new RemoteCacheNoWait[0]), rca );
  
      return rcnwf;
    } // end createCache
  
    /////////////////////////////////////////////////////////////////////
    public String getName() {
      return this.name;
    }
    public void setName( String name ) {
      this.name = name;
    }
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/server/RemoteCacheClusterManager.java
  
  Index: RemoteCacheClusterManager.java
  ===================================================================
  package  org.apache.stratum.jcs.auxiliary.remote.server;
  
  import  java.io.*;
  import  java.net.*;
  import  java.rmi.*;
  import  java.rmi.registry.*;
  import  java.util.*;
  import  java.sql.*;
  
  import  org.apache.stratum.jcs.engine.*;
  import  org.apache.stratum.jcs.engine.behavior.*;
  import  org.apache.stratum.jcs.auxiliary.remote.*;
  import  org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  import  org.apache.stratum.jcs.auxiliary.remote.group.*;
  
  import org.apache.stratum.jcs.utils.log.*;
  
  /**
   */
  public class RemoteCacheClusterManager implements ICacheManager {
  
    // Contains mappings of Location instance to RemoteCacheManager instance.
    static final Map instances = new HashMap();
    private static RemoteCacheClusterMonitor monitor;
    private static boolean debug = true;         //true;
  
    private int clients;
    private static Logger log = 
LoggerManager.getLogger(RemoteCacheClusterManager.class);
  
    // Contains instances of RemoteCacheNoWait managed by an RemoteCacheManager 
instance.
    final Map caches = new HashMap();
  
  
    final String host;
    final int port;
    final String service;
  
    private IRemoteCacheAttributes irca;
  
  
    /**
     * Handle to the remote cache service; or a zombie handle if failed to connect.
     */
    private IRemoteCacheService remoteService;
  
    /**
     * Wrapper of the remote cache watch service;
     * or wrapper of a zombie service if failed to connect.
     */
    private RemoteCacheWatchRepairable remoteWatch;
  
    /**
     * Constructs an instance to with the given remote connection parameters.
     * If the connection cannot be made, "zombie" services will be temporarily
     * used until a successful re-connection is made by the monitoring daemon.
     */
    private RemoteCacheClusterManager (String host, int port, String service) {
  
      this.host = host;
      this.port = port;
      this.service = service;
      //log = LoggerManager.getLogger(this);
      String registry = "//" + host + ":" + port + "/" + service;
      if (debug) {
        p("looking up server " + registry);
      }
      try {
        Object obj = Naming.lookup(registry);
        if (debug) {
          p("server found");
        }
        // Successful connection to the remote server.
        remoteService = (IRemoteCacheService)obj;
        remoteWatch = new RemoteCacheWatchRepairable();
        remoteWatch.setCacheWatch((IRemoteCacheObserver)obj);
      } catch (Exception ex) {
        // Failed to connect to the remote server.
        // Configure this RemoteCacheManager instance to use the "zombie" services.
        log.error(ex.getMessage());
        remoteService = new ZombieRemoteCacheService();
        remoteWatch = new RemoteCacheWatchRepairable();
        remoteWatch.setCacheWatch(new ZombieRemoteCacheWatch());
        // Notify the cache monitor about the error, and kick off the recovery process.
        RemoteCacheClusterMonitor.getInstance().notifyError();
      }
  
    }
  
  
    ////////////////////////////////////////
    public IRemoteCacheAttributes getDefaultCattr() {
      return this.irca;
    }
  
    /** Adds the remote cache listener to the underlying cache-watch service. */
    public void addRemoteCacheListener( IRemoteCacheAttributes cattr, 
IRemoteCacheListener listener) throws IOException {
      synchronized (caches) {
        remoteWatch.addCacheListener(cattr.getCacheName(), listener);
      }
      return;
    }
  
  
    /**
     * Returns an instance of RemoteCacheManager for the given connection parameters.
     * Also starts up the monitoring daemon, if not already started.
     * If the connection cannot be established, zombie objects will be used
     * for future recovery purposes.
     * @param host host of the registry where the service lookup is to be made.
     * @parma port port of the registry.
     */
    public static RemoteCacheClusterManager getInstance ( IRemoteCacheAttributes 
cattr) {
  
      String host = cattr.getRemoteHost();
      int port = cattr.getRemotePort();
      String service = cattr.getRemoteServiceName();
      if (host == null)
        host = "";
      if (port < 1024) {
        port = Registry.REGISTRY_PORT;
      }
      Location loc = new Location(host, port);
  
      RemoteCacheClusterManager ins = (RemoteCacheClusterManager)instances.get(loc);
      if (ins == null) {
        synchronized (instances) {
          ins = (RemoteCacheClusterManager)instances.get(loc);
          if (ins == null) {
            // cahnge to use cattr and to set defaults
            ins = new RemoteCacheClusterManager(host, port, service);
            ins.irca = cattr;
            instances.put(loc, ins);
          }
        }
  
        // create a listener int he absence of specific caches to listen, so the
        // cluster will be contacted from a general source, not on a per cache basis.
        try {
          cattr.setCacheName( "server" );
          ins.addRemoteCacheListener( cattr, RemoteCacheServerListener.getInstance( 
cattr ) );
        } catch( IOException ioe ) {
          log.error( ioe );
        } catch( Exception e ) {
          log.error( e );
        }
  
      }
      if (debug) {
        ins.log.logIt("Manager stats : " + ins.getStats() + "<br> -- in 
getInstance()");
      }
      ins.clients++;
      // Fires up the monitoring daemon.
      if (monitor == null) {
        monitor = RemoteCacheClusterMonitor.getInstance();
        // If the returned monitor is null, it means it's already started elsewhere.
        if (monitor != null) {
          Thread t = new Thread(monitor);
          t.setDaemon(true);
          t.start();
        }
      }
  
      /* moved to factory
      try {
        // create one cache to make imported framework function
        cattr.setCacheName( "SYSTEM_CLUSTER");
        ins.addRemoteCacheListener( cattr, RemoteCacheServerListener.getInstance( 
cattr ) );
      } catch( IOException ioe ) {
        ins.log.error( ioe );
      } catch( Exception e ) {
        ins.log.error( e );
      }
      */
      return  ins;
    }
  
  
    /** Returns a remote cache for the given cache name. */
    /** Returns a remote cache for the given cache name. */
    public ICache getCache ( String cacheName ) {
      IRemoteCacheAttributes ca = (IRemoteCacheAttributes)irca.copy();
      ca.setCacheName( cacheName );
      return getCache( ca );
    }
    public ICache getCache ( IRemoteCacheAttributes cattr ) {
      RemoteCacheNoWait c = null;
      synchronized (caches) {
        c = (RemoteCacheNoWait)caches.get( cattr.getCacheName() );
        if (c == null) {
          c = new RemoteCacheNoWait(new RemoteCache(cattr, remoteService));
          caches.put(cattr.getCacheName(), c);
        }
      }
      if (debug) {
        log.logIt("Manager stats : " + getStats());
      }
      //if ( irca.getUseRemote() ) {
        try {
          addRemoteCacheListener( cattr, RemoteCacheServerListener.getInstance( cattr 
) );
        } catch( IOException ioe ) {
          log.error( ioe );
        } catch( Exception e ) {
          log.error( e );
        }
      //}
      return  c;
    }
  
    /////////////////////////////////////////////////////////
    public void freeCache (String name) throws IOException {
      ICache c = null;
  
      synchronized(caches) {
        c = (ICache)caches.get(name);
      }
      if (c != null) {
        c.dispose();
      }
    }
  
    /////////////////////////////////////////////////////////////////////
    // Don't care if there is a concurrency failure ?
    public String getStats () {
      StringBuffer stats = new StringBuffer();
      Iterator allCaches = caches.values().iterator();
      while (allCaches.hasNext()) {
        ICache c = (ICache)allCaches.next();
        if (c != null) {
          stats.append("<br>&nbsp;&nbsp;&nbsp;" + c.getStats());
        }
      }
      return  stats.toString();
    }
  
    /////////////////////////////////////////////////////////////////
    public void release () {
      // Wait until called by the last client
      if (--clients != 0) {
        return;
      }
      synchronized (caches) {
        Iterator allCaches = caches.values().iterator();
        while (allCaches.hasNext()) {
          ICache c = (ICache)allCaches.next();
          if (c != null) {
            try {
              c.dispose();
            } catch (IOException ex) {
              ex.printStackTrace();
              log.logEx(ex);
            }
          }
        }
      }
    }             //end release()
  
    /** Fixes up all the caches managed by this cache manager. */
    public void fixCaches (IRemoteCacheService remoteService, IRemoteCacheObserver 
remoteWatch) {
      synchronized (caches) {
        this.remoteService = remoteService;
        this.remoteWatch.setCacheWatch(remoteWatch);
        for (Iterator en = caches.values().iterator(); en.hasNext();) {
          RemoteCacheNoWait cache = (RemoteCacheNoWait)en.next();
          cache.fixCache(this.remoteService);
        }
      }
    }
  
    /////////////////////////////////
    private void p (String s) {
      log.debug("RemoteCacheManager:" + s);
    }
  
    ///////////////////////////////////////
    public int getCacheType() {
      return REMOTE_CACHE;
    }
  
  
    //////////////////////////////////////////////////////////////////////
    /** Location of the RMI registry. */
    private static final class Location {
      public final String host;
      public final int port;
  
      public Location (String host, int port) {
        this.host = host;
        this.port = port;
      }
  
      public boolean equals (Object obj) {
        if (obj == this) {
          return  true;
        }
        if (obj == null || !(obj instanceof Location))
          return  false;
        Location l = (Location)obj;
        if (this.host == null && l.host != null)
          return  false;
        return  host.equals(l.host) && port == l.port;
      }
  
      //////////////////////////////////////////
      public int hashCode () {
        return  host == null ? port : host.hashCode() ^ port;
      }
  
    }
  
  }
  
  
  
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/server/RemoteCacheClusterMonitor.java
  
  Index: RemoteCacheClusterMonitor.java
  ===================================================================
  package  org.apache.stratum.jcs.auxiliary.remote.server;
  
  import  java.util.*;
  
  import org.apache.stratum.jcs.utils.log.*;
  import  org.apache.stratum.jcs.auxiliary.remote.*;
  
  /**
   * Used to monitor and repair any failed connection for the remote cache service.
   * By default the monitor operates in a failure driven mode.  That is, it goes into 
a wait
   * state until there is an error.
   * Upon the notification of a connection error, the monitor changes to operate
   * in a time driven mode.  That is, it attempts to recover the connections on a 
periodic basis.
   * When all failed connections are restored, it changes back to the failure driven 
mode.
   */
  public class RemoteCacheClusterMonitor implements Runnable {
  
    private static RemoteCacheClusterMonitor instance;
    private static long idlePeriod = 30*1000;            // minimum 30 seconds.
    //private static long idlePeriod = 3*1000; // for debugging.
    private transient Logger log = LoggerManager.getInstance().getLogger(this);
  
    // Must make sure RemoteCacheMonitor is started before any remote error can be 
detected!
    private boolean alright=true;
  
    /** Configures the idle period between repairs. */
    public static void setIdlePeriod(long idlePeriod) {
      if (idlePeriod > RemoteCacheClusterMonitor.idlePeriod) {
        RemoteCacheClusterMonitor.idlePeriod = idlePeriod;
      }
    }
    private RemoteCacheClusterMonitor() {}
  
    /**
     * Returns the singleton instance;
     */
    static RemoteCacheClusterMonitor getInstance() {
      if (instance == null) {
        synchronized(RemoteCacheClusterMonitor.class) {
          if (instance == null) {
            return instance=new RemoteCacheClusterMonitor();
          }
        }
      }
      return instance;
    }
    /**
     * Notifies the cache monitor that an error occurred,
     * and kicks off the error recovery process.
     */
    public void notifyError() {
      bad();
      synchronized(this) {
        notify();
      }
    }
    // Run forever.
    // Avoid the use of any synchronization in the process of monitoring for 
performance reason.
    // If exception is thrown owing to synchronization,
    // just skip the monitoring until the next round.
    public void run () {
      do {
        if (alright) {
          synchronized(this) {
            if (alright) {
              // Failure driven mode.
              try {
                wait(); // wake up only if there is an error.
              } catch(InterruptedException ignore) {
              }
            }
          }
        }
        // Time driven mode: sleep between each round of recovery attempt.
        try {
  //      p("cache monitor sleeping for " + idlePeriod);
          Thread.currentThread().sleep(idlePeriod);
        } catch (InterruptedException ex) {
        // ignore;
        }
        // The "alright" flag must be false here.
        // Simply presume we can fix all the errors until proven otherwise.
        synchronized(this) {
          alright = true;
        }
  //      p("cache monitor running.");
        // Monitor each RemoteCacheManager instance one after the other.
        // Each RemoteCacheManager corresponds to one remote connection.
        for (Iterator itr = RemoteCacheClusterManager.instances.values().iterator(); 
itr.hasNext();) {
          RemoteCacheClusterManager mgr = (RemoteCacheClusterManager)itr.next();
          try {
            // If any cache is in error, it strongly suggests all caches managed by the
            // same RmicCacheManager instance are in error.  So we fix them once and 
for all.
            for (Iterator itr2 = mgr.caches.values().iterator(); itr2.hasNext();) {
              if (itr2.hasNext()) {
                RemoteCacheNoWait c = (RemoteCacheNoWait)itr2.next();
                //RemoteCacheNoWait c = (RemoteCacheNoWait)mgr.cache;
                if (c.getStatus() == c.STATUS_ERROR) {
                  RemoteCacheClusterRestore repairer = new 
RemoteCacheClusterRestore(mgr);
                  // If we can't fix them, just skip and re-try in the next round.
                  if (repairer.canFix()) {
                    repairer.fix();
                  }
                  else {
                    bad();
                  }
                  break;
                }
              }
            }
          } catch (Exception ex) {
            bad();
            // Problem encountered in fixing the caches managed by a 
RemoteCacheManager instance.
            // Soldier on to the next RemoteCacheClusterManager instance.
            log.error(ex);
          }
        }
      } while (true);
    }
    /** Sets the "alright" flag to false in a critial section. */
    private void bad() {
      if (alright) {
        synchronized(this) {
          alright = false;
        }
      }
    }
    private void p(String s) {
      System.out.println("RemoteCacheClusterMonitor:" + s);
    }
  }
  
  
  
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/server/RemoteCacheClusterRestore.java
  
  Index: RemoteCacheClusterRestore.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote.server;
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.behavior.*;
  import org.apache.stratum.jcs.auxiliary.remote.*;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  
  import org.apache.stratum.jcs.utils.log.*;
  
  import java.rmi.*;
  import java.rmi.registry.*;
  
  // may not be necessary.  cloning the remote cache framework for the cluster
  // connections.  may be able to use old, but it is probably better to move away
  // from the cache or regionally defined framework to a cluster connection.
  // this restore might be useless since the remote cacehs never put to each other
  // as a service; rather they communicate to listeners.  This does give us the
  // option to do that later though.  It seems like an ugly copy and paste job though.
  
  /**
   * Used to repair the remote caches managed by the associated instance of 
RemoteCacheManager.
   */
  public class RemoteCacheClusterRestore implements ICacheRestore {
  
    private Logger log = LoggerManager.getLogger(this);
  
    private boolean debug = true;
    private final RemoteCacheClusterManager rcm;
    //private final IAuxiliaryCacheManager rcm;
    private boolean canFix = true;
  
    private Object remoteObj;
  
    /** Constructs with the given instance of RemoteCacheManager. */
    public RemoteCacheClusterRestore(RemoteCacheClusterManager rcm) {
    //public RemoteCacheRestore(IAuxiliaryCacheManager rcm) {
      this.rcm = rcm;
    }
    /**
     * Returns true if the connection to the remote host for the corresponding cache 
manager
     * can be successfully re-established.
     */
    public boolean canFix() {
      if (!canFix)
        return canFix;
      String registry = "//" + rcm.host  + ":" + rcm.port + "/" + rcm.service;
      log.info("looking up server " + registry);
      try {
        remoteObj = Naming.lookup(registry);
        log.info("looking up server " + registry);
      } catch(Exception ex) {
        log.error(ex, "host=" + rcm.host + "; port" + rcm.port + "; service=" + 
rcm.service );
        canFix = false;
      }
      return canFix;
    }
    /** Fixes up all the caches managed by the associated cache manager. */
    public void fix() {
      if (!canFix) {
        return;
      }
      rcm.fixCaches((IRemoteCacheService)remoteObj, (IRemoteCacheObserver)remoteObj);
      String msg = "Remote connection to " + "//" + rcm.host  + ":" + rcm.port + "/" + 
rcm.service + " resumed.";
      log.info(msg);
      p(msg);
    }
    private void p(String s) {
      System.out.println("RemoteCacheRestore:"+s);
    }
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/server/RemoteCacheServer.java
  
  Index: RemoteCacheServer.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote.server;
  
  
  import java.io.*;
  import java.net.*;
  import java.rmi.*;
  import java.rmi.registry.*;
  import java.rmi.server.*;
  import java.util.*;
  
  import org.apache.stratum.jcs.access.*;
  import org.apache.stratum.jcs.access.exception.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.engine.control.CacheManagerFactory;
  import org.apache.stratum.jcs.engine.control.*;
  import org.apache.stratum.jcs.engine.control.Cache;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  import org.apache.stratum.jcs.auxiliary.remote.*;
  import org.apache.stratum.jcs.utils.log.*;
  import org.apache.stratum.jcs.utils.reuse.*;
  
  
  /**
   * Provides remote cache services.
   */
  public class RemoteCacheServer extends UnicastRemoteObject
      implements IRemoteCacheService, IRemoteCacheObserver, IRemoteCacheServiceAdmin, 
Unreferenced
  {
    ///////////////// debug must be set to false in production! ////////////////
    protected static final boolean debug=false;//true;
    public String className;
  
    private int puts = 0;
    protected static final boolean debugactivity = true;
  
    private transient Logger log;
    // Maps cache name to CacheListeners object.
    // association of listeners (regions).
    private final Hashtable cacheListenersMap = new Hashtable();
    private final Hashtable clusterListenersMap = new Hashtable();
    private CompositeCacheManager cacheManager;
  
    ////////////////////////////////////////
    // relates listener id with a type
    private final Hashtable idTypeMap = new Hashtable();
  
    //private transient int listenerId = 0;
    private int[] listenerId = new int[1];
  
    ///////////////////////////////////////////////////////////
    protected RemoteCacheServer (String prop, int port) throws IOException, 
NotBoundException {
      super(port);
      init(prop);
    }
    protected RemoteCacheServer (String prop) throws IOException, NotBoundException {
      init(prop);
    }
    /**
     * RMI Cache Server.
     */
    protected void init(String prop) throws IOException, NotBoundException {
  
      String s = this.getClass().getName();
      int idx = s.lastIndexOf(".");
      this.className = s.substring(idx+1);
  
      log = LoggerManager.getLogger( this );
      cacheManager = createCacheManager(prop);
  
      // cacheManager would have created a number of ICache objects.
      // Use these objects to set up the cacheListenersMap.
      String[] list = cacheManager.getCacheNames();
      for (int i=0; i < list.length; i++) {
        String name = list[i];
        cacheListenersMap.put(name, new CacheListeners(cacheManager.getCache(name)));
        //cacheListenersMap.put(name, new CacheListeners(cacheManager.getCache(name)));
      }
  
      /*
      // TOO UGLY
      // should register listeners with other clusters here.
      // CLUSTERLISTENERS
      // populate attr
      RemoteCacheAttributes rca = new RemoteCacheAttributes();
      Properties props = RemoteUtils.loadProps(prop);
      rca.setClusterServers( props.getProperty("remote.clusterServers") );
      RemoteCacheClusterFactory rccf = new RemoteCacheClusterFactory();
      rca.setCacheName( "SYSTEM_CLUSTER" );
      // this creates managers for each on cluster list, so failur can be managed
      // individually
      ICache ic = rccf.createCache( rca );
  
      // listenersMap is a local cache or cache service that has a group of assoicated
      // listeners.  When an element is put in the remote cache it looks at the
      // cache names, get the listenersMap entry for it and then sends a put to all the
      // listeners in that queue
      clusterListenersMap.put("SYSTEM_CLUSTER", new CacheListeners(ic) );
      */
      /*
      int node = 0;
      for (Iterator itr = RemoteCacheClusterManager.instances.values().iterator(); 
itr.hasNext();) {
        node++;
        RemoteCacheClusterManager mgr = (RemoteCacheClusterManager)itr.next();
        // will repeat, ok
        clusterListenersMap.put("SYSTEM_CLUSTER", new 
CacheListeners(mgr.getCache("SYSTEM_CLUSTER")) );
      }
      */
    }
  
  
    /** Subclass can overrdie this method to create the specific cache manager. */
    protected CompositeCacheManager createCacheManager(String prop) {
      return CacheManagerFactory.getInstance(prop == null ? "/remote.cache.properties" 
: prop);
    }
  
  
    /**
     * Returns the cache lsitener for the specified cache.
     * Creates the cache and the cache descriptor if they do not already exist.
     */
    private CacheListeners getCacheListeners (String cacheName) throws IOException,
        NotBoundException {
      CacheListeners cacheListeners = (CacheListeners)cacheListenersMap.get(cacheName);
      if (cacheListeners == null) {
        synchronized (cacheListenersMap) {
          cacheListeners = (CacheListeners)cacheListenersMap.get(cacheName);
          if (cacheListeners == null) {
            // NEED TO CONVERT TO USE THE FACTORY ND GET A FACADE?  No it is the hub
            cacheListeners = new CacheListeners(cacheManager.getCache(cacheName));
            cacheListenersMap.put(cacheName, cacheListeners);
          }
        }
      }
      return  cacheListeners;
    }
  
    // may be able to remove this
    private CacheListeners getClusterListeners (String cacheName) throws IOException,
        NotBoundException {
      CacheListeners cacheListeners = 
(CacheListeners)clusterListenersMap.get(cacheName);
      if (cacheListeners == null) {
        synchronized (clusterListenersMap) {
          cacheListeners = (CacheListeners)clusterListenersMap.get(cacheName);
          if (cacheListeners == null) {
            cacheListeners = new CacheListeners(cacheManager.getCache(cacheName));
            clusterListenersMap.put(cacheName, cacheListeners);
          }
        }
      }
      return  cacheListeners;
    }
  
  
    /////////////////////// Implements the ICacheService interface. //////////////////
    /**
     * Puts a cache bean to the remote cache and notifies all listeners which<br>
     * <ol>
     * <li>have a different host than the originating host;
     * <li>are currently subscribed to the related cache.
     * </ol>
     */
    public void put(ICacheElement item) throws IOException {
      update(item);
    }
    public void update(ICacheElement item) throws IOException {
      update( item, (byte)0 );
    }
    public void update(ICacheElement item, byte requesterId) throws IOException {
  
      if ( debugactivity ) {
        puts++;
        if ( puts % 100 == 0 ) {
          p1( "puts = " + puts );
        }
      }
  
      if (debug) {
        //p( "." );
        if ( debug ) {
          p("in update, put " + item.getKey() + " in " + item.getCacheName() );
        }
      }
      if ( log.logLevel >= log.INFO ) {
        log.info("in update, put " + item.getKey() );
        if ( log.logLevel >= log.DEBUG ) {
          log.debug("item = " + item);
        }
      }
      try {
        CacheListeners cacheDesc = getCacheListeners(item.getCacheName());
        Object val = item.getVal();
        // ordered cache item update and notification.
        synchronized(cacheDesc) {
          try {
            //cacheDesc.cache.put(item.getKey(), item.getVal(), item.getAttributes(), 
item.getGroupName()  );
            //cacheDesc.cache.put(item.getKey(), item.getVal(), item.getAttributes()  
);
            //Attributes attr = item.getAttributes();
  
            // need to break interface to signle not to update the remotes
            //Can go ICompositeCache
            //Cache c = (Cache)cacheDesc.cache;
            ICompositeCache c = (ICompositeCache)cacheDesc.cache;
            c.update(item, false);
            // bypassing this may leave no cluster recovery
          } catch ( Exception oee ) {}
  
          ICacheEventQueue[] qlist = getEventQList(cacheDesc, requesterId);
  
          for (int i=0; i < qlist.length; i++) {
            qlist[i].addPutEvent( item );
          }
  
          // UPDATE THE CLUSTER
          // Since the cluster only updates via the listeners, we
          // don't even have to check it here
          // may be able to use the same listener code
  
          // make sure the requester isn't a CLUSTER
          Integer remoteTypeL = (Integer)idTypeMap.get( new Byte(requesterId) );
          // right now 0 is the what will be sent since the call back does not
          // identify the id of the observer.  Clusters either need to  be put via the 
remote
          // or they should use 0 to signify that they are clusters
          if ( (remoteTypeL != null) && (requesterId != 0) && (remoteTypeL.intValue() 
!= IRemoteCacheAttributes.CLUSTER) ) {
          //if ( false ) {
            if ( debug ) {
              p( "updating clusters **************************************" );
            }
            try {
              CacheListeners cacheDescC = getClusterListeners(item.getCacheName());
              ICacheEventQueue[] qlistC = getEventQList(cacheDescC, requesterId);
              if ( debug ) {
                p( "cluster event queue length = " + qlistC.length );
              }
              for (int i=0; i < qlistC.length; i++) {
                qlistC[i].addPutEvent( item );
              }
            } catch( Exception e ) {
              log.error( e );
            }
          // end if not a CLUSTER requester
          } else {
            if ( debug ) {
              p( "NOT updating clusters xxxxxxxxxxxxx" );
            }
          }
  
        }
      } catch(NotBoundException ex) {
        ex.printStackTrace( System.out );
        throw new IllegalStateException(ex.getMessage());
      } catch( Exception e ) {
        log.error( e );
      }
      return;
    }
  
    ////////////////////////////////////////////////////////////////////////////
    private ICacheEventQueue[] getEventQList (CacheListeners cacheListeners, byte 
requesterId) {
      ICacheEventQueue[] list = null;
      synchronized (cacheListeners.eventQMap) {
        list = (ICacheEventQueue[])cacheListeners.eventQMap.values().toArray(new 
ICacheEventQueue[0]);
      }
      int count = 0;
      // Set those not qualified to null;  Count those qualified.
      for (int i = 0; i < list.length; i++) {
        ICacheEventQueue q = list[i];
        if (q.isAlive() && q.getListenerId() != requesterId) {
          count++;
        }
        else {
          list[i] = null;
        }
      }
      if (count == list.length) {
        // All qualified.
        return  list;
      }
      // Returns only the qualified.
      ICacheEventQueue[] qq = new ICacheEventQueue[count];
      count = 0;
      for (int i = 0; i < list.length; i++) {
        if (list[i] != null) {
          qq[count++] = list[i];
        }
      }
      return  qq;
    }
  
  
    /**
     * Returns a cache value from the specified remote cache;
     * or null if the cache or key does not exist.
     */
    public Serializable get(String cacheName, Serializable key) throws IOException {
      return get( cacheName, key, true );
    }
    public Serializable get(String cacheName, Serializable key, boolean container) 
throws IOException {
      if (debug) {
        p("get " + key + " from cache " + cacheName);
      }
      //CacheListeners cacheDesc = (CacheListeners)cacheListenersMap.get(cacheName);
      CacheListeners cacheDesc = null;
      try {
        cacheDesc = getCacheListeners(cacheName);
      } catch( Exception  e ) {
        log.error( e );
      }
      //ICompositeCache c = (ICompositeCache)cacheDesc.cache;
      if ( cacheDesc == null ) {
        return null;
      } else {
        ICompositeCache c = (ICompositeCache)cacheDesc.cache;
        return c.get(key, container, false);
      }
        //return cacheDesc == null ? null : cacheDesc.cache.get(key, container);
        // could add some clustered get
    }
  
  
    /**
     * Removes the given key from the specified remote cache.
     */
    public void remove(String cacheName, Serializable key) throws IOException {
      remove( cacheName, key, (byte)0 );
    }
    public void remove(String cacheName, Serializable key, byte requesterId) throws 
IOException {
      if (debug) {
        p("remove " + key + " from cache " + cacheName);
      }
      CacheListeners cacheDesc = (CacheListeners)cacheListenersMap.get(cacheName);
  
      if (cacheDesc != null) {
        // best attempt to achieve ordered cache item removal and notification.
        synchronized(cacheDesc) {
          // No need to notify if it was not cached.
          if (cacheDesc.cache.remove(key)) {
            //p( "propogating remove, was cached" );
  
            ICacheEventQueue[] qlist = getEventQList(cacheDesc, requesterId);
            //p( "propogating remove, was cached" );
            //p( "qlist.length = " + qlist.length );
  
            for (int i=0; i < qlist.length; i++) {
              qlist[i].addRemoveEvent(key);
            }
          } else {
            //p( "not propogating remove, not cached" );
          }
        }
      }
      return;
    }
    /**
     * Remove all keys from the sepcified remote cache.
     */
    public void removeAll(String cacheName) throws IOException {
      removeAll( cacheName, (byte)0 );
    }
    public void removeAll(String cacheName, byte requesterId) throws IOException {
      CacheListeners cacheDesc = (CacheListeners)cacheListenersMap.get(cacheName);
  
      if (cacheDesc != null) {
        // best attempt to achieve ordered cache item removal and notification.
        synchronized(cacheDesc) {
          ICacheEventQueue[] qlist = getEventQList(cacheDesc, requesterId);
  
          for (int i=0; i < qlist.length; i++) {
            qlist[i].addRemoveAllEvent();
          }
          cacheDesc.cache.removeAll();
        }
      }
      return;
    }
    /**
     * Frees the specified remote cache.
     */
    public void dispose(String cacheName) throws IOException {
      dispose( cacheName, (byte)0 );
    }
    public void dispose(String cacheName, byte requesterId) throws IOException {
      CacheListeners cacheDesc = (CacheListeners)cacheListenersMap.get(cacheName);
  
      if (cacheDesc != null) {
        // best attempt to achieve ordered free-cache-op and notification.
        synchronized(cacheDesc) {
          ICacheEventQueue[] qlist = getEventQList(cacheDesc, requesterId );
  
          for (int i=0; i < qlist.length; i++) {
            qlist[i].addDisposeEvent();
          }
          cacheManager.freeCache(cacheName);
        }
      }
      return;
    }
    /**
     * Frees all remote caches.
     */
    public void release() throws IOException {
      synchronized(cacheListenersMap) {
        for (Enumeration en=cacheListenersMap.elements(); en.hasMoreElements();) {
          CacheListeners cacheDesc = (CacheListeners)en.nextElement();
          ICacheEventQueue[] qlist = getEventQList(cacheDesc, (byte)0 );
  
          for (int i=0; i < qlist.length; i++) {
            qlist[i].addDisposeEvent();
          }
        }
        cacheManager.release();
      }
      return;
    }
  
    /////////////////////////////////////////////
    // modify to use unique name
    private String getRequester() {
      try {
        return getClientHost();
      } catch(ServerNotActiveException ex) {
        // impossible case.
        ex.printStackTrace();
        throw new IllegalStateException(ex.getMessage());
      }
    }
  
    /////////////////////// Implements the ICacheObserver interface. //////////////////
    private static void cleanupEventQMap(Map eventQMap) {
      synchronized(eventQMap) {
        for (Iterator itr=eventQMap.entrySet().iterator(); itr.hasNext();) {
          Map.Entry e = (Map.Entry)itr.next();
          ICacheEventQueue q = (ICacheEventQueue)e.getValue();
  
          if (!q.isAlive()) {
            itr.remove();
            p1("Cache event queue " + q + " dead and removed from cache server.");
          }
        }
      }
    }
    /**
     * Subscribes to the specified remote cache.
     * @param cacheName the specified remote cache.
     * @param listener object to notify for cache changes.
     * must be synchronized since there are remote calls involved.
     */
    public void addCacheListener(String cacheName, ICacheListener listener)
      throws IOException
    {
      if (cacheName == null || listener == null)
        throw new IllegalArgumentException("cacheName and listener must not be null");
      try {
        CacheListeners cacheDesc;
        //if ( cacheName.equals("SYSTEM_CLUSTER") || listener instanceof 
org.apache.stratum.jcs.auxiliary.remote.server.RemoteCacheServerListener ) {
        IRemoteCacheListener ircl = (IRemoteCacheListener)listener;
        int remoteType = ircl.getRemoteType();
        if ( remoteType == IRemoteCacheAttributes.CLUSTER ) {
          p( "adding cluster listener" );
          cacheDesc = getClusterListeners(cacheName);
        } else {
          p( "adding normal listener" );
          cacheDesc = getCacheListeners(cacheName);
        }
        Map eventQMap = cacheDesc.eventQMap;
        cleanupEventQMap(eventQMap);
  
        synchronized ( ICacheListener.class ){
          byte id = 0;
          try {
            id = listener.getListenerId();
            if ( id == 0 ) {
              // must start at one so the next gets recognized
              byte listenerIdB = nextListenerId();
              if (log.DEBUG <= log.logLevel) {
                log.debug("listener id=" + (listenerIdB & 0xff) + " addded for cache " 
+ cacheName);
              }
              listener.setListenerId( listenerIdB );
              id = listenerIdB;
              // in case it needs synchronization
              p1( "added new vm listener " + listenerIdB );
  
              // relate the type to an id
              this.idTypeMap.put( new Byte(listenerIdB), new Integer(remoteType) );
  
            } else {
              p1( "added existing vm listener " + id  );
            }
          } catch( IOException ioe ) {}
            //eventQMap.put(listener, new CacheEventQueue(listener, getRequester(), 
cacheName));
            eventQMap.put(listener, new CacheEventQueue(listener, id, cacheName));
  
  
              if (debug) {
                p("****** Cache " + cacheName +"'s listener 
size="+cacheDesc.eventQMap.size());
              }
        } // end sync
          } catch(NotBoundException ex) {
            ex.printStackTrace();
            throw new IllegalStateException(ex.getMessage());
          }
      return;
    }
  
  
    /**
     * Subscribes to all remote caches.
     * @param obj object to notify for all cache changes.
     */
    public void addCacheListener(ICacheListener listener) throws IOException {
      for (Enumeration en=cacheListenersMap.keys(); en.hasMoreElements();) {
        String cacheName = (String)en.nextElement();
        addCacheListener(cacheName, listener);
  
        if (debug) {
          p("adding listener for cache " + cacheName);
        }
      }
      return;
    }
    /**
     * Unsubscribes from the specified remote cache.
     * @param obj existing subscriber.
     */
    public void removeCacheListener(String cacheName, ICacheListener listener) throws 
IOException
    {
      try {
        CacheListeners cacheDesc = getCacheListeners(cacheName);
        Map eventQMap = cacheDesc.eventQMap;
        cleanupEventQMap(eventQMap);
            ICacheEventQueue q = (ICacheEventQueue)eventQMap.remove(listener);
  
            if (q != null) {
              q.destroy();
            }
            if (debug) {
              p("****** Cache " + cacheName +"'s listener 
size="+cacheDesc.eventQMap.size());
            }
          } catch(NotBoundException ex) {
            ex.printStackTrace();
            throw new IllegalStateException(ex.getMessage());
          }
    }
    /**
     * Unsubscribes from all remote caches.
     * @param obj existing subscriber.
     */
    public void removeCacheListener(ICacheListener listener) throws IOException {
      for (Enumeration en=cacheListenersMap.keys(); en.hasMoreElements();) {
        String cacheName = (String)en.nextElement();
        removeCacheListener(cacheName, listener);
  
        if (debug) {
          p("removing listener for cache " + cacheName);
        }
      }
      return;
    }
    /////////////////////// Implements the ICacheServiceAdmin interface. 
//////////////////
    public void shutdown() throws IOException {
      RemoteCacheServerFactory.shutdownImpl("", Registry.REGISTRY_PORT);
    }
    public void shutdown(String host, int port) throws IOException {
      p( "received shutdown request" );
      RemoteCacheServerFactory.shutdownImpl(host, port);
    }
    /////////////////////// Implements the Unreferenced interface. //////////////////
    /**
     * Called by the RMI runtime sometime after the runtime determines that
     * the reference list, the list of clients referencing the remote object,
     * becomes empty.
     */
    // TODO: test out the DGC.
    public void unreferenced() {
      p("*** Warning: Server now unreferenced and subject to GC. ***");
    }
  
    /////////////////////////////////////////////////
    public String getStats() throws IOException {
      return cacheManager.getStats();
    }
  
  
      /**
     * Returns the next generated listener id [0,255].
     */
    private byte nextListenerId() {
      int id=0;
      if (listenerId[0] == 255) {
        synchronized(listenerId) {
          id = listenerId[0];
          listenerId[0] = 0;
          // TODO: record & check if the generated id is currently being
          // used by a valid listener.  Currently if the id wraps after 255,
          // we just assume it won't collide with an existing listener who is live.
        }
      }
      else {
        synchronized(listenerId) {
          id = ++listenerId[0];
        }
      }
      return (byte)(id & 0xff);
    }
  
    ///////////////////////////////////////////////
    private static void p1(String s) {
      System.out.println("RemoteCacheServer:"+s+" >"+Thread.currentThread().getName());
    }
  
    //////////////////////////////////////////////
    protected void p(String s) {
      if (debug) {
        System.out.println("RemoteCacheServer:"+s+" 
>"+Thread.currentThread().getName());
      }
      else {
        log.debug("RemoteCacheServer:"+s+" >"+Thread.currentThread().getName());
      }
    }
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/server/RemoteCacheServerFactory.java
  
  Index: RemoteCacheServerFactory.java
  ===================================================================
  package  org.apache.stratum.jcs.auxiliary.remote.server;
  
  import org.apache.stratum.jcs.engine.control.Cache;
  
  import  java.io.*;
  import  java.net.*;
  import  java.rmi.*;
  import  java.rmi.registry.*;
  import  java.rmi.server.*;
  import  java.util.*;
  
  import  org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  import  org.apache.stratum.jcs.auxiliary.remote.*;
  import  org.apache.stratum.jcs.engine.behavior.*;
  import  org.apache.stratum.jcs.access.*;
  import  org.apache.stratum.jcs.engine.*;
  import  org.apache.stratum.jcs.utils.reuse.*;
  
  
  /**
   * Provides remote cache services.
   */
  public class RemoteCacheServerFactory
      implements IRemoteCacheConstants {
  
    /** The single instance of the RemoteCacheServer object. */
    private static RemoteCacheServer instance;
    private static String serviceName;
    static boolean debug = false;
  
    private RemoteCacheServerFactory () {
    }
  
    /////////////////////// Statup/shutdown methods. //////////////////
    /**
     * Starts up the remote cache server on this JVM, and binds it to the registry on 
the
     * given host and port.
     */
    public static void startup (String host, int port, String propFile) throws 
IOException,
        NotBoundException {
      if (instance != null) {
        throw  new IllegalArgumentException("Server already started.");
      }
      synchronized (RemoteCacheServer.class) {
        if (instance != null) {
          return;
        }
        Properties prop = RemoteUtils.loadProps(propFile);
  
        String servicePortStr = prop.getProperty(REMOTE_CACHE_SERVICE_PORT);
        int servicePort = -1;
        try {
          servicePort = Integer.parseInt(servicePortStr);
          p("Remote cache service uses port number " + servicePort + ".");
        } catch(NumberFormatException ignore) {
          p("Remote cache service port property " + REMOTE_CACHE_SERVICE_PORT + " not 
specified.  An anonymous port will be used.");
        }
        instance = servicePort == -1 ? new RemoteCacheServer(propFile) : new 
RemoteCacheServer(propFile, servicePort);
  
  
        if (host == null) {
          host = "";
        }
        // Register the RemoteCacheServer remote object in the registry.
        serviceName = prop.getProperty(REMOTE_CACHE_SERVICE_NAME, 
REMOTE_CACHE_SERVICE_VAL).trim();
        p("main> binding server to " + host + ":" + port + " with the name " +
            serviceName);
        try {
          Naming.rebind("//" + host + ":" + port + "/" + serviceName, instance);
        } catch (MalformedURLException ex) {
          // impossible case.
          throw  new IllegalArgumentException(ex.getMessage() + "; host=" + host
              + ", port=" + port);
        }
      }
    }
  
    /**
     * put your documentation comment here
     * @param host
     * @param port
     * @exception IOException
     */
    static void shutdownImpl (String host, int port) throws IOException {
      if (instance == null) {
        return;
      }
      synchronized (RemoteCacheServer.class) {
        if (instance == null) {
          return;
        }
        p("Unbinding host=" + host + ", port=" + port + ", serviceName=" + 
serviceName);
        try {
          Naming.unbind("//" + host + ":" + port + "/" + serviceName);
        } catch (MalformedURLException ex) {
          // impossible case.
          throw  new IllegalArgumentException(ex.getMessage() + "; host=" + host
              + ", port=" + port + ", serviceName=" + serviceName);
        } catch (NotBoundException ex) {
        //ignore.
        }
        instance.release();
        instance = null;
        // TODO: safer exit ?
        try {
          Thread.currentThread().sleep(2000);
        } catch (InterruptedException ex) {}
        System.exit(0);
      }
    }
  
    /**
     * Creates an local RMI registry on the default port,
     * starts up the remote cache server, and binds it to the registry.
     */
    public static void main (String[] args) throws Exception {
      Properties prop = args.length > 0 ? RemoteUtils.loadProps(args[0]) : new 
Properties();
  
      // shutdown
      if (args.length > 0 && args[0].toLowerCase().indexOf("-shutdown") != -1) {
        String serviceName = prop.getProperty(REMOTE_CACHE_SERVICE_NAME, 
REMOTE_CACHE_SERVICE_VAL).trim();
        String registry = "//:" + Registry.REGISTRY_PORT + "/" + serviceName;
  
        if (debug) {
          p("looking up server " + registry);
        }
        Object obj = Naming.lookup(registry);
        if (debug) {
          p("server found");
        }
        ICacheServiceAdmin admin = (ICacheServiceAdmin)obj;
        try {
          admin.shutdown();
        } catch (Exception ex) {
          p( ex.toString() );
          ex.printStackTrace( System.out );
        }
        p("done.");
        System.exit(0);
      }
  
  
      // startup.
      int port;
      try {
        port = Integer.parseInt(prop.getProperty("registry.port"));
      } catch (NumberFormatException ex) {
        port = Registry.REGISTRY_PORT;
      }
      String host = prop.getProperty("registry.host");
  
      if (host == null || host.trim().equals("") || host.trim().equals("localhost")) {
        p("main> creating registry on the localhost");
        port = RemoteUtils.createRegistry(port);
      }
      p("main> starting up RemoteCacheServer");
      RemoteCacheServerFactory.startup(host, port, args.length > 0 ? args[0] : null);
      p("main> done");
  
    } // end main
  
    ////////////////////////////////////////////////
    private static void p (String s) {
      System.out.println("RemoteCacheServerFactory:" + s + " >" + 
Thread.currentThread().getName());
    }
  }
  
  
  
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/server/RemoteCacheServerInfo.java
  
  Index: RemoteCacheServerInfo.java
  ===================================================================
  package  org.apache.stratum.jcs.auxiliary.remote.server;
  
  import  java.rmi.dgc.VMID;
  
  /** A shared static variable holder for the server */
  public class RemoteCacheServerInfo {
  
    // shouldn't be instantiated
    private RemoteCacheServerInfo(){}
  
    /** Shouldn't be used till after reconneting, after setting = thread safe
     *  Used to identify a client, so we can run multiple clients off one host.
     *  Need since there is no way to identify a client other than by host in rmi.
     */
    protected static VMID vmid = new VMID();
    public static byte listenerId = (byte)vmid.hashCode();
  
  }
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/server/RemoteCacheServerListener.java
  
  Index: RemoteCacheServerListener.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.remote.server;
  
  //////////////////////////////////
  import  java.io.*;
  import  java.rmi.*;
  import  java.rmi.registry.*;
  import  java.rmi.server.*;
  import  java.util.*;
  import  java.sql.*;
  
  /////////////////////////////////////
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.engine.control.*;
  import org.apache.stratum.jcs.engine.group.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.access.*;
  import org.apache.stratum.jcs.auxiliary.disk.*;
  import org.apache.stratum.jcs.auxiliary.lateral.*;
  import org.apache.stratum.jcs.auxiliary.remote.*;
  import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
  
  // remove
  import org.apache.stratum.jcs.utils.log.*;
  
  ///////////////////////////////////////////////////////////////////////
  /**
   * This listener class is for inter cache commumication.
   */
  public class RemoteCacheServerListener implements IRemoteCacheListener, 
IRemoteCacheConstants, Serializable {
  
    protected static final boolean debug = true;
    protected static final boolean debugcmd = true;
  
    protected static transient Logger log;
  
    protected static transient ICompositeCacheManager cacheMgr;
  
    protected static IRemoteCacheListener instance;
    protected IRemoteCacheAttributes irca;
  
    protected static final boolean debugactivity = true;
    private int puts = 0;
    private int removes = 0;
  
    /////////////////////////////////////////////////////////////
    /**
     *  Only need one since it does work for all regions,
     *  just reference by multiple region names.
     */
    protected RemoteCacheServerListener ( IRemoteCacheAttributes irca ) {
  
      this.irca = irca;
  
      log = LoggerManager.getLogger( "remote_remotecachemanager" );
  
      // may need to add to ICacheManager interface to handle
      // the source arument extended update and remove methods
  
      // causes circular reference, unfortunate, becasue the
      // the overhead is higer
      // will need to pass a refernce thru
      //cacheMgr = CacheManagerFactory.getInstance();
  
      // Export this remote object to make it available to receive incoming calls,
      // using an anonymous port.
      try {
        if ( irca.getLocalPort() != 0 ) {
          UnicastRemoteObject.exportObject(this, irca.getLocalPort() );
        } else {
          UnicastRemoteObject.exportObject(this);
        }
      } catch (RemoteException ex) {
        log.error(ex);
        throw  new IllegalStateException(ex.getMessage());
      }
  
    }
  
    /** let the remote cache set a listener_id.
     *  Since there is only one listerenr for all the regions
     *  and every region gets registered? the id shouldn't be set
     *  if it isn't zero.  If it is we assume that it is a reconnect.
     */
    public void setListenerId( byte id ) throws IOException {
      RemoteCacheServerInfo.listenerId = id;
      if ( debugcmd ) {
        p( "set listenerId = " +  id );
      }
    }
    public byte getListenerId( ) throws IOException {
  
      // set the manager since we are in use
      //getCacheManager();
  
      //p( "get listenerId" );
      if ( debugcmd ) {
        p( "get listenerId = " +  RemoteCacheServerInfo.listenerId );
      }
      return RemoteCacheServerInfo.listenerId;
    }
  
      ////////////////////////////////////////////////////
    public int getRemoteType( ) throws IOException {
      if ( debugcmd ) {
        p( "getRemoteType = " +  irca.getRemoteType() );
      }
      return irca.getRemoteType();
    }
  
     ///////////////////////////////////////////////////////////////////////////
     public static IRemoteCacheListener getInstance( IRemoteCacheAttributes irca )  {
       //throws IOException, NotBoundException
        if (instance == null) {
          synchronized(RemoteCacheServerListener.class) {
            if (instance == null) {
             instance = new RemoteCacheServerListener( irca );
            }
          }
        }
        //instance.incrementClients();
        return instance;
     }
  
  
    /////////////////////////////////////////////////////////////////////////////
    //////////////////////////// implements the IRemoteCacheListener interface. 
//////////////
    /**
     * Just remove the element since it has been updated elsewhere
     * cd should be incomplete for faster transmission.
     * We don't want to pass data only invalidation.
     * The next time it is used the local cache will get
     * the new version from the remote store
     */
    public void handlePut (ICacheElement cb) throws IOException {
      if ( debugactivity ) {
        puts++;
        if ( puts % 100 == 0 ) {
          p( "puts = " + puts );
        }
      }
      ICompositeCache cache = (ICompositeCache)cacheMgr.getCache(irca.getCacheName());
      cache.update( cb, false );
    }
  
    ////////////////////////////////////////////////////
    public void handleRemove (String cacheName, Serializable key) throws IOException {
      if ( debugactivity ) {
        removes++;
        if ( removes % 100 == 0 ) {
          p( "removes = " + removes );
        }
      }
  
      if (debug) {
        log.debug("handleRemove> cacheName=" + cacheName + ", key=" + key);
      }
      if ( debugcmd ) {
        p("handleRemove> cacheName=" + cacheName + ", key=" + key);
      }
  
      getCacheManager();
      // interface limitation here
  
      Cache cache = (Cache)cacheMgr.getCache(cacheName);
      cache.remove(key, cache.REMOTE_INVOKATION);
    }
  
    //////////////////////////////////////////////////
    public void handleRemoveAll (String cacheName) throws IOException {
      if (debug) {
        log.debug("handleRemoveAll> cacheName=" + cacheName);
      }
      getCacheManager();
      ICache cache = cacheMgr.getCache(cacheName);
      cache.removeAll();
    }
  
    ///////////////////////////////////////////////////
    public void handleDispose (String cacheName) throws IOException {
      if (debug) {
        log.debug("handleDispose> cacheName=" + cacheName);
      }
      CompositeCacheManager cm = (CompositeCacheManager)cacheMgr;
      cm.freeCache(cacheName, Cache.REMOTE_INVOKATION);
    }
  
  
    ////////////////////////////////////////////////
    // override for new funcitonality
    protected void getCacheManager() {
      if ( cacheMgr == null ) {
        cacheMgr = (ICompositeCacheManager)CacheManagerFactory.getInstance();
        p( "had to get cacheMgr" );
        if ( debugcmd ) {
          p( "cacheMgr = " + cacheMgr );
        }
      } else {
        if ( debugcmd ) {
          p( "already got cacheMgr = " + cacheMgr );
        }
      }
    }
  
    //////////////////////////////////////////////////
    public static void p( String s ) {
      System.out.println( "RemoteCacheServerListener: " + s );
    }
  
  } // end class
  
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to