asmuts 02/01/14 22:33:14
Added: src/java/org/apache/stratum/jcs/auxiliary/remote
RemoteCache.java RemoteCacheAttributes.java
RemoteCacheClientTest.java RemoteCacheFactory.java
RemoteCacheFailoverRunner.java RemoteCacheInfo.java
RemoteCacheListener.java RemoteCacheManager.java
RemoteCacheMonitor.java RemoteCacheNoWait.java
RemoteCacheNoWaitFacade.java
RemoteCacheRestore.java
RemoteCacheWatchRepairable.java RemoteUtils.java
ZombieRemoteCacheService.java
ZombieRemoteCacheWatch.java
Log:
the core rmi remote cache
failover needs to be more efficient
Revision Changes Path
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCache.java
Index: RemoteCache.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
import org.apache.stratum.jcs.engine.control.Cache;
import java.io.*;
import java.rmi.*;
import java.util.*;
import java.sql.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.remote.*;
import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
import org.apache.stratum.jcs.utils.reuse.*;
/**
* Client proxy for an RMI remote cache.
*
*/
public class RemoteCache implements ICache {
private static boolean dbug = false;//true;
Logger log;
private static int numCreated = 0;
private static boolean debug = false;
private static boolean debugR = false;
private static boolean debugPut = false;
final String cacheName;
private IRemoteCacheService remote;
private IRemoteCacheAttributes irca;
Attributes attr = null;
private HashMap keyHash; // not synchronized to maximize concurrency.
private String source_id = "org.apache.stratum.jcs.auxiliary.remote.RemoteCache";
///////////////////////////////////////////////////
public Serializable getSourceId() {
return this.source_id;
}
////////////////////////////////////////
public String toString() {
return "RemoteCache: " + cacheName;
}
////////////////////////////////////////////////////////
// was public but need to access from server
public RemoteCache(IRemoteCacheAttributes cattr, IRemoteCacheService remote) {
this.irca = cattr;
this.cacheName= cattr.getCacheName();
this.remote = remote;
log = LoggerManager.getLogger( this );
if (dbug) {
p("Construct> cacheName=" + cattr.getCacheName());
p( "irca = " + irca.toString() );
}
/*
// TODO
// should be done by the remote cache, not the job of the hub manager
// Set up the idle period for the RemoteCacheMonitor.
long monPeriod = 0;
try {
monPeriod = Long.parseLong(props.getProperty("remote.monitor.idle.period",
"0"));
} catch(NumberFormatException ex) {
log.warn(ex.getMessage());
}
RemoteCacheMonitor.setIdlePeriod(monPeriod);
*/
}
//////////////////////////////////
public void setAttributes ( Attributes attr ) {
this.attr = attr;
}
//////////////////////////////////
public Attributes getAttributes () {
return this.attr;
}
/**
* Synchronously put to the remote cache; if failed, replace the remote
* handle with a zombie.
*/
public void put(Serializable key, Serializable value ) throws IOException {
put( key, value, this.attr.copy() );
}
public void put(Serializable key, Serializable value, Attributes attr ) throws
IOException {
try {
CacheElement ce = new CacheElement(cacheName,sanitized(key), sanitized(value));
ce.setAttributes( attr );
update( ce );
} catch(Exception ex) {
handleException(ex, "Failed to put " + key + " to " + cacheName);
//throw ex;
}
}
public void update( ICacheElement ce ) throws IOException {
if ( !this.irca.getGetOnly() ) {
try {
remote.update( ce, RemoteCacheInfo.listenerId );
} catch ( NullPointerException npe ) {
log.error( npe, "npe for ce = " + ce + "ce.attr = " + ce.getAttributes() );
return;
} catch(Exception ex) {
handleException(ex, "Failed to put " + ce.getKey() + " to " +
ce.getCacheName());
//throw ex;
}
} else {
//p( "get only mode, irca = " + irca.toString() );
}
}
/**
* Synchronously get from the remote cache; if failed, replace the remote
* handle with a zombie.
*/
public Serializable get(Serializable key) throws IOException {
try {
return remote.get(cacheName, sanitized(key));
} catch ( ObjectNotFoundException one ) {
log.debug( "didn't find element " + key + " in remote" );
return null;
} catch(Exception ex) {
handleException(ex, "Failed to get " + key + " from " + cacheName);
//throw ex;
return null; // never executes; just keep the compiler happy.
}
}
/**
* Wraps a non JDK object into a MarshalledObject, so that we can avoid
unmarshalling
* the real object on the remote side. This technique offers the benefit of
* surviving incompatible class versions without the need to restart the remote
cache server.
*/
private Serializable sanitized(Serializable s) throws IOException {
// In the unlikely case when the passed in object is a MarshalledObjct, we again
wrap
// it into a new MarsahlledObject for "escape" purposes during the get operation.
//return s.getClass().getName().startsWith("java.") && !(s instanceof
MarshalledObject) ? s : new MarshalledObject(s);
// avoid this step for now, [problem with group id wrapper]
return s;
}
/**
* Synchronously get from the remote cache; if failed, replace the remote
* handle with a zombie.
*/
public Serializable get(Serializable key, boolean container) throws IOException {
try {
return remote.get(cacheName, sanitized(key), container);
} catch ( ObjectNotFoundException one ) {
log.debug( "didn't find element " + key + " in remote" );
return null;
} catch(Exception ex) {
handleException(ex, "Failed to get " + key + " from " + cacheName);
return null; // never executes; just keep the compiler happy.
//throw ex;
}
}
/**
* Synchronously remove from the remote cache; if failed, replace the remote
* handle with a zombie.
*/
public boolean remove(Serializable key) throws IOException {
if ( !this.irca.getGetOnly() ) {
if (dbug) {
p("remove> key="+key);
}
try {
remote.remove(cacheName, sanitized(key), RemoteCacheInfo.listenerId);
} catch(Exception ex) {
handleException(ex, "Failed to remove " + key + " from " + cacheName);
//throw ex;
}
}
return false;
}
/**
* Synchronously removeAll from the remote cache; if failed, replace the remote
* handle with a zombie.
*/
public void removeAll() throws IOException {
if ( !this.irca.getGetOnly() ) {
try {
remote.removeAll(cacheName, RemoteCacheInfo.listenerId);
} catch(Exception ex) {
handleException(ex, "Failed to remove all from " + cacheName);
//throw ex;
}
}
}
/**
* Synchronously dispose the remote cache; if failed, replace the remote
* handle with a zombie.
*/
public void dispose() throws IOException {
// remote.freeCache(cacheName);
p( "disposing of remote cache" );
try {
remote.dispose(cacheName);
} catch(Exception ex) {
p( "couldn't dispose" );
handleException(ex, "Failed to dispose " + cacheName);
//remote = null;
}
}
public String getStats(){
return "cacheName = " + cacheName;
}
/**
* Returns the cache status.
* An error status indicates the remote connection is not available.
*/
public int getStatus() {
return remote instanceof IZombie ? STATUS_ERROR : STATUS_ALIVE;
}
/** Returns the current cache size. */
public int getSize() {
return 0;
}
public int getCacheType() {
return REMOTE_CACHE;
}
public String getCacheName() {
return cacheName;
}
/** Replaces the current remote cache service handle with the given handle. */
public void fixCache(IRemoteCacheService remote) {
this.remote = remote;
return;
}
/**
* Handles exception by disabling the remote cache service before re-throwing the
* exception in the form of an IOException.
*/
private void handleException(Exception ex, String msg) throws IOException {
log.error( "Disabling remote cache due to error " + msg);
//log.error(ex);
log.error(ex.toString());
remote = new ZombieRemoteCacheService();
// may want to flush if region specifies
// Notify the cache monitor about the error, and kick off the recovery process.
RemoteCacheMonitor.getInstance().notifyError();
// initiate failover if local
RemoteCacheNoWaitFacade rcnwf =
(RemoteCacheNoWaitFacade)RemoteCacheFactory.facades.get( irca.getCacheName() );
p( "Initiating failover, rcnf = " + rcnwf );
if ( rcnwf != null && rcnwf.rca.getRemoteType() == rcnwf.rca.LOCAL ) {
p( "found facade calling failover" );
// may need to remove the noWait index here. It will be 0 if it is local
// since there is only 1 possible listener.
rcnwf.failover( 0 );
}
if (ex instanceof IOException) {
throw (IOException)ex;
}
throw new IOException(ex.getMessage());
}
private void p(String s) {
System.out.println("RemoteCache:"+s+" >"+Thread.currentThread().getName());
log.debug("RemoteCache:"+s+" >"+Thread.currentThread().getName());
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheAttributes.java
Index: RemoteCacheAttributes.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
/**
* Title:
* Description:
* Copyright: Copyright (c) 2001
* Company:
* @author Aaron Smuts
* @version 1.0
*/
import java.io.*;
import java.util.*;
import org.apache.stratum.jcs.auxiliary.behavior.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.behavior.IAuxiliaryCacheAttributes;
import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
import org.apache.stratum.jcs.auxiliary.remote.*;
import org.apache.stratum.jcs.utils.log.*;
import org.apache.stratum.jcs.utils.reuse.*;
////////////////////////////////////////////////////////////////
public class RemoteCacheAttributes implements IRemoteCacheAttributes {
private String cacheName;
private String name;
private String remoteServiceName = IRemoteCacheConstants.REMOTE_CACHE_SERVICE_VAL;
private String remoteHost;
private int remotePort;
/*
* failover servers will be used by local caches one at a time.
* Listeners will be registered with all cluster servers.
* If we add a get from cluster attribute we will have the ability
* to chain clusters and have them get from each other.
*/
private String failoverServers = "";
private String clusterServers = "";
private boolean getFromCluster = true;
private int localPort = 0;
private int remoteType = LOCAL;
private int failoverIndex = 0;
private String[] failovers;
private boolean removeUponRemotePut = true;
private boolean getOnly = false;
////////////////////////////////////////////////////
public RemoteCacheAttributes() {
}
//////////////////////////////////////////////////////
public String getRemoteTypeName() {
if ( remoteType == LOCAL ) {
return "LOCAL";
} else if ( remoteType == CLUSTER ) {
return "CLUSTER";
}
return "LOCAL";
}
public void setRemoteTypeName( String s ) {
if ( s.equals("LOCAL") ) {
remoteType = LOCAL;
} else if ( s.equals("CLUSTER") ) {
remoteType = CLUSTER;
}
}
///////////////////////////////////////////
public int getFailoverIndex() {
return failoverIndex;
}
public void setFailoverIndex( int p ) {
this.failoverIndex = p;
}
////////////////////////////////////////////
public String[] getFailovers() {
return this.failovers;
}
public void setFailovers( String[] f ) {
this.failovers = f;
}
/////////////////////////////////////////////////////////
public int getRemoteType() {
return remoteType;
}
public void setRemoteType( int p ) {
this.remoteType = p;
}
////////////////////////////////////////////////////
public void setCacheName( String s ) {
this.cacheName = s;
}
public String getCacheName( ) {
return this.cacheName;
}
/////////////////////////////////////////////////////////////////////
public String getName() {
return this.name;
}
public void setName( String name ) {
this.name = name;
}
/////////////////////////////////////////////////
public IAuxiliaryCacheAttributes copy() {
try {
return (IAuxiliaryCacheAttributes)this.clone();
} catch( Exception e ){}
return (IAuxiliaryCacheAttributes)this;
}
////////////////////////////////////////////////////
public String getRemoteServiceName() {
return this.remoteServiceName;
}
public void setRemoteServiceName( String s) {
this.remoteServiceName = s;
}
////////////////////////////////////////////////////
public String getRemoteHost() {
return this.remoteHost;
}
public void setRemoteHost( String s ) {
this.remoteHost = s;
}
////////////////////////////////////////////////////
public int getRemotePort() {
return this.remotePort;
}
public void setRemotePort( int p ) {
this.remotePort = p;
}
////////////////////////////////////////////////////
public String getClusterServers() {
return this.clusterServers;
}
public void setClusterServers( String s ) {
this.clusterServers = s;
}
////////////////////////////////////////////////////
public String getFailoverServers() {
return this.failoverServers;
}
public void setFailoverServers( String s ) {
this.failoverServers = s;
}
////////////////////////////////////////////////////
public int getLocalPort() {
return this.localPort;
}
public void setLocalPort( int p ) {
this.localPort = p;
}
////////////////////////////////////////////////////////
public boolean getRemoveUponRemotePut() {
return this.removeUponRemotePut;
}
public void setRemoveUponRemotePut( boolean r ) {
this.removeUponRemotePut = r;
}
////////////////////////////////////////////////////////
public boolean getGetOnly() {
return this.getOnly;
}
public void setGetOnly( boolean r ) {
this.getOnly = r;
}
////////////////////////////////////////////////////
public String toString() {
StringBuffer buf = new StringBuffer();
buf.append( "\nremotePort = " + this.remoteHost );
buf.append( "\nremotePort = " + this.remotePort );
buf.append( "\ncacheName = " + this.cacheName );
buf.append( "\nremoveUponRemotePut = " + this.removeUponRemotePut );
buf.append( "\ngetOnly = " + getOnly );
return buf.toString();
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheClientTest.java
Index: RemoteCacheClientTest.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.remote.behavior.IRemoteCacheConstants;
import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.utils.reuse.*;
import java.io.*;
import java.net.*;
import java.rmi.*;
import java.rmi.registry.*;
import java.rmi.server.*;
import java.util.*;
public class RemoteCacheClientTest implements IRemoteCacheListener,
IRemoteCacheConstants {
ICacheObserver watch;
ICacheService cache;
boolean debug=true;
/** The registry host name. */
final String host;
/** The registry port number. */
final int port;
final int count;
protected static byte listenerId = 0;
////////////////////////////////////////////////////
public int getRemoteType( ) throws IOException {
return 0;
}
public RemoteCacheClientTest(int count)
throws MalformedURLException, NotBoundException, IOException
{
this(count, true, true, false);
}
public RemoteCacheClientTest(int count, boolean write, boolean read, boolean
delete)
throws MalformedURLException, NotBoundException, IOException
{
this("", Registry.REGISTRY_PORT, count, write, read, delete);
}
public RemoteCacheClientTest(String host, int port, int count,
boolean write, boolean read, boolean delete)
throws MalformedURLException, NotBoundException, IOException
{
this.count = count;
this.host = host;
this.port = port;
// record export exception
Exception ee = null;
try {
// Export this remote object to make it available to receive incoming calls,
// using an anonymous port.
UnicastRemoteObject.exportObject(this);
ee = null;
} catch (ExportException e) {
// use already exported object; remember exception
ee = e;
}
String service = System.getProperty(REMOTE_CACHE_SERVICE_NAME);
if (service == null) {
service = REMOTE_CACHE_SERVICE_VAL;
}
String registry = "//" + host + ":" + port + "/" + service;
if (debug) {
p("looking up server " + registry);
}
Object obj = Naming.lookup(registry);
if (debug) {
p("server found");
}
cache = (ICacheService)obj;
watch = (ICacheObserver)obj;
if (debug) {
p("subscribing to the server");
}
watch.addCacheListener("testCache", this);
ICacheElement cb = new CacheElement("testCache", "testKey", "testVal");
for (int i=0; i < count; i++) {
cb = new CacheElement("testCache", ""+i, ""+i);
if (delete) {
if (debug) {
p("deleting a cache item from the server " + i);
}
cache.remove(cb.getCacheName(), cb.getKey());
}
if (write) {
if (debug) {
p("putting a cache bean to the server " + i);
}
try {
cache.update( cb );
} catch ( ObjectExistsException oee ) {
p( oee.toString() );
}
}
if (read) {
try {
Object val = cache.get(cb.getCacheName(), cb.getKey());
p("get " + cb.getKey() + " returns " + val);
} catch ( ObjectNotFoundException onfe ) {}
}
}
}
public void handlePut(ICacheElement cb) throws IOException {
p("handlePut> cb="+cb);
}
public void handleRemove(String cacheName, Serializable key) throws IOException {
p("handleRemove> cacheName="+cacheName+", key="+key);
}
public void handleRemoveAll(String cacheName) throws IOException {
p("handleRemove> cacheName="+cacheName);
}
public void handleDispose(String cacheName) throws IOException {
p("handleDispose> cacheName="+cacheName);
}
/*
public void handleRelease() throws IOException {
p("handleRelease>");
}
*/
private static void p(String s) {
System.out.println("RemoteCacheClientTest:"+s);
}
public static void main(String[] args) throws Exception {
int count=0;
boolean read=false;
boolean write=false;
boolean delete=false;
for (int i=0; i < args.length; i++) {
if (args[i].startsWith("-")) {
if (!read) {
read = args[i].indexOf("r") != -1;
}
if (!write) {
write = args[i].indexOf("w") != -1;
}
if (!delete) {
delete = args[i].indexOf("d") != -1;
}
}
else {
count = Integer.parseInt(args[i]);
}
}
RemoteCacheClientTest client = new RemoteCacheClientTest(count, write, read,
delete);
}
public void setListenerId( byte id ) throws IOException {
this.listenerId = id;
p( "listenerId = " + id );
}
public byte getListenerId( ) throws IOException {
return this.listenerId;
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheFactory.java
Index: RemoteCacheFactory.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
/**
* Title:
* Description:
* Copyright: Copyright (c) 2001
* Company:
* @author
* @version 1.0
*/
import java.util.*;
//import org.apache.stratum.jcs.auxiliary.*;
import org.apache.stratum.jcs.auxiliary.behavior.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.remote.*;
import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
public class RemoteCacheFactory implements IAuxiliaryCacheFactory {
private static Logger log = LoggerManager.getLogger( RemoteCacheFactory.class );
private static String name;
// store reference of facades to initiate failover
public static final HashMap facades = new HashMap();
///////////////////////////////////
public RemoteCacheFactory() {
}
///////////////////////////////////////////////////////////////
/**
* Interface method. Allows classforname construction, making caches pluggable.
* Should be able to make this work for clusters and local caches
*/
public ICache createCache( IAuxiliaryCacheAttributes iaca ) {
RemoteCacheAttributes rca = (RemoteCacheAttributes)iaca;
ArrayList noWaits = new ArrayList();
// if LOCAL
if ( rca.getRemoteType() == rca.LOCAL ) {
// a list toi be turned into an array of failover server information
ArrayList failovers = new ArrayList();
// not necessary if a failover list is defined
// REGISTER PRIMARY LISTENER
// if it is a primary
if ( rca.getRemoteHost() != null ) {
failovers.add( rca.getRemoteHost() + ":" + rca.getRemotePort() );
RemoteCacheManager rcm = RemoteCacheManager.getInstance( rca );
ICache ic = rcm.getCache( rca );
if ( ic != null ) {
noWaits.add( ic );
} else {
//p( "noWait is null" );
}
}
// GET HANDLE BUT DONT REGISTER A LISTENER FOR FAILOVERS
String failoverList = rca.getFailoverServers();
if ( failoverList != null ) {
StringTokenizer fit = new StringTokenizer( failoverList, "," );
while( fit.hasMoreElements() ) {
String server = (String)fit.nextElement();
failovers.add( server );
rca.setRemoteHost( server.substring( 0,server.indexOf(":") ) );
rca.setRemotePort( Integer.parseInt(server.substring( server.indexOf(":")
+1)) );
RemoteCacheManager rcm = RemoteCacheManager.getInstance( rca );
// add a listener if there are none, need to tell rca what number it is at
if ( noWaits.size() <= 0 ) {
ICache ic = rcm.getCache( rca.getCacheName() );
if ( ic != null ) {
noWaits.add( ic );
} else {
//p( "noWait is null" );
}
}
} // end while
} // end if failoverList != null
rca.setFailovers( (String[])failovers.toArray(new String[0]) );
// if CLUSTER
} else
if ( rca.getRemoteType() == rca.CLUSTER ) {
// REGISTER LISTENERS FOR EACH SYSTEM CLUSTERED CACHEs
StringTokenizer it = new StringTokenizer( rca.getClusterServers(), "," );
while( it.hasMoreElements() ) {
//String server = (String)it.next();
String server = (String)it.nextElement();
//p( "tcp server = " + server );
rca.setRemoteHost( server.substring( 0,server.indexOf(":") ) );
rca.setRemotePort( Integer.parseInt(server.substring( server.indexOf(":")
+1)) );
RemoteCacheManager rcm = RemoteCacheManager.getInstance( rca );
rca.setRemoteType( rca.CLUSTER );
ICache ic = rcm.getCache( rca );
if ( ic != null ) {
noWaits.add( ic );
} else {
//p( "noWait is null" );
}
}
} // end if CLUSTER
//RemoteCacheNoWaitFacade rcnwf = new RemoteCacheNoWaitFacade(
(RemoteCacheNoWait[])noWaits.toArray(new RemoteCacheNoWait[0]), iaca.getCacheName() );
RemoteCacheNoWaitFacade rcnwf = new RemoteCacheNoWaitFacade(
(RemoteCacheNoWait[])noWaits.toArray(new RemoteCacheNoWait[0]), rca );
facades.put( rca.getCacheName(), rcnwf );
return rcnwf;
} // end createCache
/////////////////////////////////////////////////////////////////////
public String getName() {
return this.name;
}
public void setName( String name ) {
this.name = name;
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheFailoverRunner.java
Index: RemoteCacheFailoverRunner.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
import java.util.*;
import org.apache.stratum.jcs.auxiliary.behavior.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.remote.*;
import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
//////////////////////////////////////////////////////////////
public class RemoteCacheFailoverRunner implements Runnable {
private RemoteCacheNoWaitFacade facade;
private static long idlePeriod = 20*1000;
private boolean alright=true;
private transient Logger log =
LoggerManager.getLogger(RemoteCacheFailoverRunner.class);
//////////////////////////////////////////////
public RemoteCacheFailoverRunner( RemoteCacheNoWaitFacade facade ) {
this.facade = facade;
}
//////////////////////////////////////////////
/**
* Notifies the cache monitor that an error occurred,
* and kicks off the error recovery process.
*/
public void notifyError() {
bad();
synchronized(this) {
notify();
}
}
////////////////////////////////////////////////
public void run() {
do {
// will only be run if there is an error
/*
if (alright) {
synchronized(this) {
if (alright) {
// Failure driven mode.
try {
wait(); // wake up only if there is an error.
} catch(InterruptedException ignore) {
}
}
}
}
// The "alright" flag must be false here.
// Simply presume we can fix all the errors until proven otherwise.
synchronized(this) {
alright = true;
}
*/
p("cache failover running.");
// there is no active listener
if( !alright ) {
// reset listener id for reidentification by new remote
// RemoteCacheInfo.listenerId = 0;
// this may not work, we may need to have unique listener ids per failover
// Monitor each RemoteCacheManager instance one after the other.
// Each RemoteCacheManager corresponds to one remote connection.
String[] failovers = facade.rca.getFailovers();
int fidx = facade.rca.getFailoverIndex();
p( "fidx = " + fidx + " failovers.length = " + failovers.length );
int i=fidx+1;
p( "i = " + i );
for ( ; i < failovers.length; i++) {
p( "i = " + i );
String server = failovers[i];
RemoteCacheAttributes rca = null;
try {
rca = (RemoteCacheAttributes)facade.rca.copy();
rca.setRemoteHost( server.substring( 0,server.indexOf(":") ) );
rca.setRemotePort( Integer.parseInt(server.substring(
server.indexOf(":") +1)) );
RemoteCacheManager rcm = RemoteCacheManager.getInstance( rca );
p( "RemoteCacheAttributes for failover = " + rca.toString() );
// add a listener if there are none, need to tell rca what number it is
at
ICache ic = rcm.getCache( rca.getCacheName() );
if ( ic != null ) {
if ( ic.getStatus() == ic.STATUS_ALIVE ) {
// may need to do this more gracefully
p( "reseting no wait" );
facade.noWaits = new RemoteCacheNoWait[1];
facade.noWaits[0] = (RemoteCacheNoWait)ic;
facade.rca.setFailoverIndex(i);
synchronized(this) {
p( "setting ALRIGHT to true, moving to Primary Recovery Mode" );
alright = true;
p( "CONNECTED to " + rca.getRemoteHost() + ":" +
rca.getRemotePort() + "\n\n" );
}
}
} else {
//p( "noWait is null" );
}
} catch (Exception ex) {
bad();
p( "FAILED to connect to " + rca.getRemoteHost() + ":" +
rca.getRemotePort() );
// Problem encountered in fixing the caches managed by a
RemoteCacheManager instance.
// Soldier on to the next RemoteCacheManager instance.
log.error(ex.toString());
}
}
} // end if !alright
else {
p( "ALRIGHT is true -- failover runner is in primary recovery mode" );
log.warn( "ALRIGHT is true -- failover runner is in primary recovery mode"
);
}
/////////////////////////////////////////
//try to move back to the primary
String[] failovers = facade.rca.getFailovers();
String server = failovers[0];
try {
RemoteCacheAttributes rca = (RemoteCacheAttributes)facade.rca.copy();
rca.setRemoteHost( server.substring( 0,server.indexOf(":") ) );
rca.setRemotePort( Integer.parseInt(server.substring( server.indexOf(":")
+1)) );
RemoteCacheManager rcm = RemoteCacheManager.getInstance( rca );
// add a listener if there are none, need to tell rca what number it is at
ICache ic = rcm.getCache( rca.getCacheName() );
if ( ic != null ) {
if ( ic.getStatus() == ic.STATUS_ALIVE ) {
// may need to do this more gracefully
p( "reseting no wait to PRIMARY" );
facade.noWaits = new RemoteCacheNoWait[1];
facade.noWaits[0] = (RemoteCacheNoWait)ic;
facade.rca.setFailoverIndex(0);
//return;
}
} else {
//p( "noWait is null" );
}
} catch (Exception ex) {
log.error(ex);
}
// Time driven mode: sleep between each round of recovery attempt.
try {
p("cache failover runner sleeping for " + idlePeriod);
Thread.currentThread().sleep(idlePeriod);
} catch (InterruptedException ex) {
// ignore;
}
// try to bring the listener back to the primary
} while ( facade.rca.getFailoverIndex() > 0 );
p( "exiting failover runner" );
return;
}
/** Sets the "alright" flag to false in a critial section. */
private void bad() {
if (alright) {
synchronized(this) {
alright = false;
}
}
}
private void p(String s) {
if ( log.logLevel >= log.DEBUG ) {
log.debug( s );
}
System.out.println("RemoteCacheFailoveRunner:" + s);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheInfo.java
Index: RemoteCacheInfo.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
/*
* This won't work when registered to mulitple remotes, will use a variable int he
rcm
*/
/** A shared static variable holder for the remote cache */
public class RemoteCacheInfo {
// shouldn't be instantiated
private RemoteCacheInfo(){}
/** Shouldn't be used till after reconneting, after setting = thread safe
* Used to identify a client, so we can run multiple clients off one host.
* Need since there is no way to identify a client other than by host in rmi.
*/
protected static byte listenerId = 0;
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheListener.java
Index: RemoteCacheListener.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
//////////////////////////////////
import java.io.*;
import java.rmi.*;
import java.rmi.registry.*;
import java.rmi.server.*;
import java.util.*;
import java.sql.*;
/////////////////////////////////////
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.engine.control.*;
import org.apache.stratum.jcs.engine.group.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.*;
import org.apache.stratum.jcs.auxiliary.disk.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.remote.*;
import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
// remove
import org.apache.stratum.jcs.utils.log.*;
///////////////////////////////////////////////////////////////////////
public class RemoteCacheListener implements IRemoteCacheListener,
IRemoteCacheConstants, Serializable {
protected static final boolean debug = false;//true;
protected static final boolean debugcmd = false;//true;
protected static transient Logger log;
protected static transient ICompositeCacheManager cacheMgr;
protected static IRemoteCacheListener instance;
protected IRemoteCacheAttributes irca;
// TODO: change this log level
protected static final boolean debugactivity = true;
protected int puts = 0;
protected int removes = 0;
/////////////////////////////////////////////////////////////
/**
* Only need one since it does work for all regions,
* just reference by multiple region names.
*/
protected RemoteCacheListener ( IRemoteCacheAttributes irca ) {
this.irca = irca;
log = LoggerManager.getLogger( "remote_remotecachemanager" );
// may need to add to ICacheManager interface to handle
// the source arument extended update and remove methods
// causes circular reference, unfortunate, becasue the
// the overhead is higer
// will need to pass a refernce thru
//cacheMgr = CacheManagerFactory.getInstance();
// Export this remote object to make it available to receive incoming calls,
// using an anonymous port.
try {
if ( irca.getLocalPort() != 0 ) {
UnicastRemoteObject.exportObject(this, irca.getLocalPort() );
} else {
UnicastRemoteObject.exportObject(this);
}
} catch (RemoteException ex) {
log.error(ex);
throw new IllegalStateException(ex.getMessage());
}
}
/** let the remote cache set a listener_id.
* Since there is only one listerenr for all the regions
* and every region gets registered? the id shouldn't be set
* if it isn't zero. If it is we assume that it is a reconnect.
*/
public void setListenerId( byte id ) throws IOException {
RemoteCacheInfo.listenerId = id;
if ( debugcmd ) {
p( "set listenerId = " + id );
}
}
public byte getListenerId( ) throws IOException {
// set the manager since we are in use
//getCacheManager();
//p( "get listenerId" );
if ( debugcmd ) {
p( "get listenerId = " + RemoteCacheInfo.listenerId );
}
return RemoteCacheInfo.listenerId;
}
////////////////////////////////////////////////////
public int getRemoteType( ) throws IOException {
if ( debugcmd ) {
p( "getRemoteType = " + irca.getRemoteType() );
}
return irca.getRemoteType();
}
///////////////////////////////////////////////////////////////////////////
public static IRemoteCacheListener getInstance( IRemoteCacheAttributes irca ) {
//throws IOException, NotBoundException
if (instance == null) {
synchronized(RemoteCacheListener.class) {
if (instance == null) {
instance = new RemoteCacheListener( irca );
}
}
}
//instance.incrementClients();
return instance;
}
/////////////////////////////////////////////////////////////////////////////
//////////////////////////// implements the IRemoteCacheListener interface.
//////////////
/**
* Just remove the element since it has been updated elsewhere
* cd should be incomplete for faster transmission.
* We don't want to pass data only invalidation.
* The next time it is used the local cache will get
* the new version from the remote store
*/
public void handlePut (ICacheElement cb) throws IOException {
if ( irca.getRemoveUponRemotePut() ) {
// for now
//if ( true ) {
if ( debugcmd ) {
p( "PUTTING ELEMENT FROM REMOTE, ( invalidating ) " );
}
handleRemove(cb.getCacheName(), cb.getKey());
} else {
if ( debugcmd ) {
p( "PUTTING ELEMENT FROM REMOTE, ( updating ) " );
p( "cb = " + cb );
}
if ( debugactivity ) {
puts++;
if ( puts % 100 == 0 ) {
p( "puts = " + puts );
}
}
getCacheManager();
ICompositeCache cache = (ICompositeCache)cacheMgr.getCache(cb.getCacheName());
cache.update( cb, false );
/*
ICompositeCache cache =
(ICompositeCache)cacheMgr.getCache(irca.getCacheName());
// copy the element to avoid the remote reference, may need to do a deep copy
ICacheElement ce = new CacheElement( cb.getCacheName(), cb.getKey(),
cb.getVal() );
ce.setAttributes( cb.getAttributes().copy() );
try {
cache.update( ce, false );
} catch( Exception e ) {
// strange marshalling exception encountered
log.error( e );
}
*/
}
return;
}
////////////////////////////////////////////////////
public void handleRemove (String cacheName, Serializable key) throws IOException {
if ( debugactivity ) {
removes++;
if ( removes % 100 == 0 ) {
p( "removes = " + removes );
}
}
if (debug) {
log.debug("handleRemove> cacheName=" + cacheName + ", key=" + key);
}
if ( debugcmd ) {
p("handleRemove> cacheName=" + cacheName + ", key=" + key);
}
getCacheManager();
// interface limitation here
Cache cache = (Cache)cacheMgr.getCache(cacheName);
cache.remove(key, cache.REMOTE_INVOKATION);
}
//////////////////////////////////////////////////
public void handleRemoveAll (String cacheName) throws IOException {
if (debug) {
log.debug("handleRemoveAll> cacheName=" + cacheName);
}
getCacheManager();
ICache cache = cacheMgr.getCache(cacheName);
cache.removeAll();
}
///////////////////////////////////////////////////
public void handleDispose (String cacheName) throws IOException {
if (debug) {
log.debug("handleDispose> cacheName=" + cacheName);
}
CompositeCacheManager cm = (CompositeCacheManager)cacheMgr;
cm.freeCache(cacheName, Cache.REMOTE_INVOKATION);
}
////////////////////////////////////////////////
// override for new funcitonality
protected void getCacheManager() {
if ( cacheMgr == null ) {
cacheMgr = (ICompositeCacheManager)CacheManagerFactory.getInstance();
p( "had to get cacheMgr" );
if ( debugcmd ) {
p( "cacheMgr = " + cacheMgr );
}
} else {
if ( debugcmd ) {
p( "already got cacheMgr = " + cacheMgr );
}
}
}
//////////////////////////////////////////////////
public static void p( String s ) {
if ( log.logLevel >= log.DEBUG ) {
log.debug( "RemoteCacheListener: " + s );
}
System.out.println( "RemoteCacheListener: " + s );
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheManager.java
Index: RemoteCacheManager.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
import java.io.*;
import java.net.*;
import java.rmi.*;
import java.rmi.registry.*;
import java.util.*;
import java.sql.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.behavior.*;
import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
import org.apache.stratum.jcs.auxiliary.remote.group.*;
import org.apache.stratum.jcs.utils.log.*;
/**
* An instance of RemoteCacheManager corresponds to one remote connection of a
* specific host and port.
* All RemoteCacheManager instances are monitored by the singleton
* RemoteCacheMonitor monitoring daemon for error detection and recovery.
*/
public class RemoteCacheManager implements IAuxiliaryCacheManager {
// Contains mappings of Location instance to RemoteCacheManager instance.
static final Map instances = new HashMap();
private static RemoteCacheMonitor monitor;
private static boolean debug = false;//true; //true;
private int clients;
private Logger log;
// Contains instances of RemoteCacheNoWait managed by an RemoteCacheManager
instance.
final Map caches = new HashMap();
final String host;
final int port;
final String service;
private IRemoteCacheAttributes irca;
/**
* Handle to the remote cache service; or a zombie handle if failed to connect.
*/
private IRemoteCacheService remoteService;
/**
* Wrapper of the remote cache watch service;
* or wrapper of a zombie service if failed to connect.
*/
private RemoteCacheWatchRepairable remoteWatch;
/**
* Constructs an instance to with the given remote connection parameters.
* If the connection cannot be made, "zombie" services will be temporarily
* used until a successful re-connection is made by the monitoring daemon.
*/
private RemoteCacheManager (String host, int port, String service) {
this.host = host;
this.port = port;
this.service = service;
log = LoggerManager.getLogger(this);
String registry = "//" + host + ":" + port + "/" + service;
if (debug) {
p("looking up server " + registry);
}
try {
Object obj = Naming.lookup(registry);
if (debug) {
p("server found");
}
// Successful connection to the remote server.
remoteService = (IRemoteCacheService)obj;
remoteWatch = new RemoteCacheWatchRepairable();
remoteWatch.setCacheWatch((IRemoteCacheObserver)obj);
} catch (Exception ex) {
// Failed to connect to the remote server.
// Configure this RemoteCacheManager instance to use the "zombie" services.
log.error(ex.getMessage());
remoteService = new ZombieRemoteCacheService();
remoteWatch = new RemoteCacheWatchRepairable();
remoteWatch.setCacheWatch(new ZombieRemoteCacheWatch());
// Notify the cache monitor about the error, and kick off the recovery process.
RemoteCacheMonitor.getInstance().notifyError();
}
}
////////////////////////////////////////
public IRemoteCacheAttributes getDefaultCattr() {
return this.irca;
}
/** Adds the remote cache listener to the underlying cache-watch service. */
public void addRemoteCacheListener( IRemoteCacheAttributes cattr,
IRemoteCacheListener listener) throws IOException {
synchronized (caches) {
remoteWatch.addCacheListener(cattr.getCacheName(), listener);
}
return;
}
/**
* Returns an instance of RemoteCacheManager for the given connection parameters.
* Also starts up the monitoring daemon, if not already started.
* If the connection cannot be established, zombie objects will be used
* for future recovery purposes.
* @param host host of the registry where the service lookup is to be made.
* @parma port port of the registry.
*/
public static RemoteCacheManager getInstance ( IRemoteCacheAttributes cattr) {
String host = cattr.getRemoteHost();
int port = cattr.getRemotePort();
String service = cattr.getRemoteServiceName();
if (host == null)
host = "";
if (port < 1024) {
port = Registry.REGISTRY_PORT;
}
Location loc = new Location(host, port);
RemoteCacheManager ins = (RemoteCacheManager)instances.get(loc);
if (ins == null) {
synchronized (instances) {
ins = (RemoteCacheManager)instances.get(loc);
if (ins == null) {
// cahnge to use cattr and to set defaults
ins = new RemoteCacheManager(host, port, service);
ins.irca = cattr;
instances.put(loc, ins);
}
}
}
if (debug) {
ins.log.logIt("Manager stats : " + ins.getStats() + "<br> -- in
getInstance()");
}
ins.clients++;
// Fires up the monitoring daemon.
if (monitor == null) {
monitor = RemoteCacheMonitor.getInstance();
// If the returned monitor is null, it means it's already started elsewhere.
if (monitor != null) {
Thread t = new Thread(monitor);
t.setDaemon(true);
t.start();
}
}
return ins;
}
/** Returns a remote cache for the given cache name. */
/** Returns a remote cache for the given cache name. */
public ICache getCache ( String cacheName ) {
IRemoteCacheAttributes ca = (IRemoteCacheAttributes)irca.copy();
ca.setCacheName( cacheName );
return getCache( ca );
}
public ICache getCache ( IRemoteCacheAttributes cattr ) {
RemoteCacheNoWait c = null;
synchronized (caches) {
c = (RemoteCacheNoWait)caches.get( cattr.getCacheName() );
if (c == null) {
c = new RemoteCacheNoWait(new RemoteCache(cattr, remoteService));
caches.put(cattr.getCacheName(), c);
}
}
if (debug) {
log.logIt("Manager stats : " + getStats());
}
//if ( irca.getUseRemote() ) {
try {
// Remote cache manager can handle this by gettign the type formt he listener
//if ( cattr.getRemoteType() == cattr.CLUSTER ) {
// addRemoteCacheListener( cattr, RemoteGroupCacheListener.getInstance(
cattr ) );
//} else
//if ( cattr.getRemoteType() == cattr.LOCAL ) {
addRemoteCacheListener( cattr, RemoteGroupCacheListener.getInstance( cattr
) );
//}
} catch( IOException ioe ) {
log.error( ioe.getMessage() );
} catch( Exception e ) {
log.error( e.getMessage() );
}
//}
return c;
}
/////////////////////////////////////////////////////////
public void freeCache (String name) throws IOException {
ICache c = null;
synchronized(caches) {
c = (ICache)caches.get(name);
}
if (c != null) {
c.dispose();
}
}
/////////////////////////////////////////////////////////////////////
// Don't care if there is a concurrency failure ?
public String getStats () {
StringBuffer stats = new StringBuffer();
Iterator allCaches = caches.values().iterator();
while (allCaches.hasNext()) {
ICache c = (ICache)allCaches.next();
if (c != null) {
stats.append("<br> " + c.getStats());
}
}
return stats.toString();
}
/////////////////////////////////////////////////////////////////
public void release () {
// Wait until called by the last client
if (--clients != 0) {
return;
}
synchronized (caches) {
Iterator allCaches = caches.values().iterator();
while (allCaches.hasNext()) {
ICache c = (ICache)allCaches.next();
if (c != null) {
try {
c.dispose();
} catch (IOException ex) {
ex.printStackTrace();
log.logEx(ex);
}
}
}
}
} //end release()
/** Fixes up all the caches managed by this cache manager. */
public void fixCaches (IRemoteCacheService remoteService, IRemoteCacheObserver
remoteWatch) {
synchronized (caches) {
this.remoteService = remoteService;
this.remoteWatch.setCacheWatch(remoteWatch);
for (Iterator en = caches.values().iterator(); en.hasNext();) {
RemoteCacheNoWait cache = (RemoteCacheNoWait)en.next();
cache.fixCache(this.remoteService);
}
}
}
/////////////////////////////////
private void p (String s) {
log.debug("RemoteCacheManager:" + s);
}
///////////////////////////////////////
public int getCacheType() {
return REMOTE_CACHE;
}
//////////////////////////////////////////////////////////////////////
/** Location of the RMI registry. */
private static final class Location {
public final String host;
public final int port;
public Location (String host, int port) {
this.host = host;
this.port = port;
}
public boolean equals (Object obj) {
if (obj == this) {
return true;
}
if (obj == null || !(obj instanceof Location))
return false;
Location l = (Location)obj;
if (this.host == null && l.host != null)
return false;
return host.equals(l.host) && port == l.port;
}
//////////////////////////////////////////
public int hashCode () {
return host == null ? port : host.hashCode() ^ port;
}
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheMonitor.java
Index: RemoteCacheMonitor.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
import org.apache.stratum.jcs.utils.log.*;
import java.util.*;
/**
* Used to monitor and repair any failed connection for the remote cache service.
* By default the monitor operates in a failure driven mode. That is, it goes into
a wait
* state until there is an error.
*
* TODO consider moving this into an active monitoring mode.
*
* Upon the notification of a connection error, the monitor changes to operate
* in a time driven mode. That is, it attempts to recover the connections on a
periodic basis.
* When all failed connections are restored, it changes back to the failure driven
mode.
*/
public class RemoteCacheMonitor implements Runnable {
private static RemoteCacheMonitor instance;
private static long idlePeriod = 30*1000; // minimum 30 seconds.
//private static long idlePeriod = 3*1000; // for debugging.
private transient Logger log = LoggerManager.getInstance().getLogger(this);
// Must make sure RemoteCacheMonitor is started before any remote error can be
detected!
private boolean alright=true;
static final int TIME = 0;
static final int ERROR = 1;
static int mode = ERROR;
/** Configures the idle period between repairs. */
public static void setIdlePeriod(long idlePeriod) {
if (idlePeriod > RemoteCacheMonitor.idlePeriod) {
RemoteCacheMonitor.idlePeriod = idlePeriod;
}
}
private RemoteCacheMonitor() {}
/**
* Returns the singleton instance;
*/
static RemoteCacheMonitor getInstance() {
if (instance == null) {
synchronized(RemoteCacheMonitor.class) {
if (instance == null) {
return instance=new RemoteCacheMonitor();
}
}
}
return instance;
}
/**
* Notifies the cache monitor that an error occurred,
* and kicks off the error recovery process.
*/
public void notifyError() {
p( "Notified of an error." );
bad();
synchronized(this) {
notify();
}
}
// Run forever.
// Avoid the use of any synchronization in the process of monitoring for
performance reason.
// If exception is thrown owing to synchronization,
// just skip the monitoring until the next round.
public void run () {
p( "Monitoring daemon started" );
do {
if ( mode == ERROR ) {
if (alright) {
synchronized(this) {
if (alright) {
// make this configurable, comment out wait to enter time driven mode
// Failure driven mode.
try {
log.debug("FAILURE DRIVEN MODE: cache monitor waiting for error" );
wait(); // wake up only if there is an error.
} catch(InterruptedException ignore) {
}
}
}
}
} else {
log.debug("TIME DRIVEN MODE: cache monitor sleeping for " + idlePeriod);
// Time driven mode: sleep between each round of recovery attempt.
// will need to test not just check status
}
try {
Thread.currentThread().sleep(idlePeriod);
} catch (InterruptedException ex) {
// ignore;
}
// The "alright" flag must be false here.
// Simply presume we can fix all the errors until proven otherwise.
synchronized(this) {
alright = true;
}
//p("cache monitor running.");
// Monitor each RemoteCacheManager instance one after the other.
// Each RemoteCacheManager corresponds to one remote connection.
for (Iterator itr = RemoteCacheManager.instances.values().iterator();
itr.hasNext();) {
RemoteCacheManager mgr = (RemoteCacheManager)itr.next();
try {
// If any cache is in error, it strongly suggests all caches managed by the
// same RmicCacheManager instance are in error. So we fix them once and
for all.
for (Iterator itr2 = mgr.caches.values().iterator(); itr2.hasNext();) {
if (itr2.hasNext()) {
RemoteCacheNoWait c = (RemoteCacheNoWait)itr2.next();
if (c.getStatus() == c.STATUS_ERROR) {
RemoteCacheRestore repairer = new RemoteCacheRestore(mgr);
// If we can't fix them, just skip and re-try in the next round.
if (repairer.canFix()) {
repairer.fix();
}
else {
bad();
}
break;
}
}
}
} catch (Exception ex) {
bad();
// Problem encountered in fixing the caches managed by a
RemoteCacheManager instance.
// Soldier on to the next RemoteCacheManager instance.
log.error(ex);
}
}
} while (true);
}
/** Sets the "alright" flag to false in a critial section. */
private void bad() {
if (alright) {
synchronized(this) {
alright = false;
}
}
}
private void p(String s) {
if ( log.logLevel >= log.DEBUG ) {
log.debug( s );
}
System.out.println("RemoteCacheMonitor:" + s);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheNoWait.java
Index: RemoteCacheNoWait.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
import org.apache.stratum.jcs.engine.control.Cache;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
import java.io.*;
import java.rmi.*;
/**
* Used to queue up update requests to the underlying cache.
* These requests will be processed in their order of arrival
* via the cache event queue processor.
*/
public class RemoteCacheNoWait implements ICache {
private final RemoteCache cache;
private ICacheEventQueue q;
private transient Logger log = LoggerManager.getInstance().getLogger(this);
private String source_id =
"org.apache.stratum.jcs.auxiliary.remote.RemoteCacheNoWait";
///////////////////////////////////////////////////
public Serializable getSourceId() {
return this.source_id;
}
/**
* Constructs with the given remote cache,
* and fires up an event queue for aysnchronous processing.
*/
public RemoteCacheNoWait(RemoteCache cache) {
this.cache = cache;
this.q = new CacheEventQueue(new CacheAdaptor(cache),
RemoteCacheInfo.listenerId, cache.getCacheName());
if (cache.getStatus() == cache.STATUS_ERROR)
q.destroy();
}
/** Adds a put request to the remote cache. */
public void put(Serializable key, Serializable value ) throws IOException {
put( key, value, null );
}
public void put(Serializable key, Serializable value, Attributes attr ) throws
IOException {
try {
CacheElement ce = new CacheElement( cache.getCacheName(), key, value );
ce.setAttributes( attr );
update( ce );
} catch(IOException ex) {
log.error(ex);
q.destroy();
throw ex;
}
}
public void update( ICacheElement ce ) throws IOException {
try {
q.addPutEvent( ce );
} catch(IOException ex) {
log.error(ex);
q.destroy();
throw ex;
}
}
/** Synchronously reads from the remote cache. */
public Serializable get(Serializable key) throws IOException{
return get( key, true );
}
public Serializable get(Serializable key, boolean container ) throws IOException{
try {
return cache.get(key);
} catch(UnmarshalException ue) {
p("Retrying the get owing to UnmarshalException...");
try {
return cache.get(key);
} catch(IOException ex) {
p("Failed in retrying the get for the second time.");
q.destroy();
}
} catch(IOException ex) {
q.destroy();
throw ex;
}
return null;
}
/** Adds a remove request to the remote cache. */
public boolean remove(Serializable key) throws IOException{
try {
q.addRemoveEvent(key);
} catch(IOException ex) {
log.error(ex);
q.destroy();
throw ex;
}
return false;
}
/** Adds a removeAll request to the remote cache. */
public void removeAll() throws IOException{
try {
q.addRemoveAllEvent();
} catch(IOException ex) {
log.error(ex);
q.destroy();
throw ex;
}
}
/** Adds a dispose request to the remote cache. */
public void dispose() {
try {
q.addDisposeEvent();
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
}
/** No remote invokation. */
public String getStats() {
return cache.getStats();
}
/** No remote invokation. */
public int getSize() {
return cache.getSize();
}
/** No remote invokation. */
public int getCacheType() {
return ICacheType.REMOTE_CACHE;
//return cache.getCacheType();
}
/**
* Returns the asyn cache status.
* An error status indicates either the remote connection is not available,
* or the asyn queue has been unexpectedly destroyed.
* No remote invokation.
*/
public int getStatus() {
return q.isAlive() ? cache.getStatus() : cache.STATUS_ERROR;
}
public String getCacheName() {
return cache.getCacheName();
}
/**
* Replaces the remote cache service handle with the given handle and
* reset the event queue by starting up a new instance.
*/
public void fixCache(IRemoteCacheService remote) {
cache.fixCache(remote);
resetEventQ();
return;
}
/** Resets the event q by first destroying the existing one and starting up new
one. */
public void resetEventQ() {
if (q.isAlive()) {
q.destroy();
}
this.q = new CacheEventQueue(new CacheAdaptor(cache),
RemoteCacheInfo.listenerId, cache.getCacheName());
}
public String toString() {
return "RemoteCacheNoWait: " + cache.toString();
}
private void p(String s) {
System.out.println("RemoteCacheNoWait:" + s);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheNoWaitFacade.java
Index: RemoteCacheNoWaitFacade.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.engine.control.*;
import org.apache.stratum.jcs.utils.log.*;
import java.io.*;
import java.rmi.*;
/**
* Used to provide access to multiple services under nowait protection.
* factory should construct NoWaitFacade to give
* to the composite cache out of caches it constructs from the varies
* manager to lateral services.
*/
public class RemoteCacheNoWaitFacade implements ICache {
private static final boolean debug = false;// true;
private static final boolean debugF = true;
public RemoteCacheNoWait[] noWaits;
private transient Logger log = LoggerManager.getInstance().getLogger(this);
private String source_id =
"org.apache.stratum.jcs.auxiliary.remote.RemoteCacheNoWaitFacade";
private String cacheName;
// holds failover and cluster information
RemoteCacheAttributes rca;
///////////////////////////////////////////////////
public RemoteCacheAttributes getRemoteCacheAttributes() {
return rca;
}
public void setRemoteCacheAttributes( RemoteCacheAttributes rca ) {
this.rca = rca;
}
///////////////////////////////////////////////////
public Serializable getSourceId() {
return this.source_id;
}
/**
* Constructs with the given remote cache,
* and fires events to any listeners.
*/
//public RemoteCacheNoWaitFacade(RemoteCacheNoWait[] noWaits, String cacheName) {
public RemoteCacheNoWaitFacade(RemoteCacheNoWait[] noWaits, RemoteCacheAttributes
rca) {
if ( debug ) {
p( "CONSTRUCTING NO WAIT FACADE" );
}
this.noWaits = noWaits;
this.rca = rca;
this.cacheName = rca.getCacheName();
}
/** Adds a put request to the lateral cache. */
public void put(Serializable key, Serializable value ) throws IOException {
put( key, value, null );
}
public void put(Serializable key, Serializable value, Attributes attr ) throws
IOException {
try {
CacheElement ce = new CacheElement( cacheName, key, value );
ce.setAttributes( attr );
update( ce );
} catch(Exception ex) {
log.error(ex);
}
}
public void update( ICacheElement ce ) throws IOException {
if ( debug ) {
p( "updating through cache facade, noWaits.length = " + noWaits.length );
}
int i=0;
try {
for( ; i < noWaits.length; i++ ) {
noWaits[i].update( ce );
// an initial move into a zombie will lock this to primary
// recovery. will not discover other servers until primary reconnect
// and subsequent error
}
} catch(Exception ex) {
log.error(ex);
// can handle failover here? Is it safe to try the others?
// check to see it the noWait is now a zombie
// if it is a zombie, then move to the next in the failover list
// will need to keep them in order or a count
failover( i ); // should start a failover thread
// should probably only failover if there is only one in the noWait list
// should start a background thread to set the original as the primary
// if we are in failover state
}
}
/** Synchronously reads from the lateral cache. */
public Serializable get(Serializable key) {
return get( key, true );
}
public Serializable get(Serializable key, boolean container ) {
for( int i=0; i < noWaits.length; i++ ) {
try {
Object obj = noWaits[i].get( key, container );
if ( obj != null ) {
return (Serializable)obj;
}
} catch(Exception ex) {
p("Failed to get.");
}
return null;
}
return null;
}
/** Adds a remove request to the lateral cache. */
public boolean remove(Serializable key) {
try {
for( int i=0; i < noWaits.length; i++ ) {
noWaits[i].remove( key );
}
} catch(Exception ex) {
log.error(ex);
}
return false;
}
/** Adds a removeAll request to the lateral cache. */
public void removeAll() {
try {
for( int i=0; i < noWaits.length; i++ ) {
noWaits[i].removeAll();
}
} catch(Exception ex) {
log.error(ex);
}
}
/** Adds a dispose request to the lateral cache. */
public void dispose() {
try {
for( int i=0; i < noWaits.length; i++ ) {
noWaits[i].dispose();
}
} catch(Exception ex) {
log.error(ex);
}
}
/** No lateral invokation. */
public String getStats() {
return "";//cache.getStats();
}
/** No lateral invokation. */
public int getSize() {
return 0; //cache.getSize();
}
/////////////////////////////////////////////
public int getCacheType() {
return ICacheType.REMOTE_CACHE;
}
public String getCacheName() {
return ""; //cache.getCacheName();
}
// need to do something with this
public int getStatus() {
return 0;//q.isAlive() ? cache.getStatus() : cache.STATUS_ERROR;
}
////////////////////////////////////////////////////////
public String toString() {
return "RemoteCacheNoWaitFacade: " + cacheName + ", rca = " + rca;
}
private void p(String s) {
System.out.println("RemoteCacheNoWaitFacade:" + s);
}
/////////////////////////////////////////
protected void failover( int i ) {
if ( debugF ) {
p( "in failover for " + i );
}
//if ( noWaits.length == 1 ) {
if ( rca.getRemoteType() == rca.LOCAL ) {
if ( noWaits[i].getStatus() == STATUS_ERROR ) {
// start failover, primary recovery process
RemoteCacheFailoverRunner runner = new RemoteCacheFailoverRunner( this );
// If the returned monitor is null, it means it's already started elsewhere.
if (runner != null) {
runner.notifyError();
Thread t = new Thread(runner);
t.setDaemon(true);
t.start();
}
} else {
if ( debugF ) {
p( "the noWait is not in error" );
}
log.info("the noWait is not in error");
}
}
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheRestore.java
Index: RemoteCacheRestore.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.behavior.*;
import org.apache.stratum.jcs.auxiliary.remote.*;
import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
import java.rmi.*;
import java.rmi.registry.*;
/**
* Used to repair the remote caches managed by the associated instance of
RemoteCacheManager.
*
* When there is an error the monitor kicks off. The Failover runner starts
* looks for a manager with a connection to a remote cache that is not in error.
* If a manager's connection to a remote cache is found to be in error, the
* restorer kicks off and tries to reconnect. When it is succesful, the status
* of the manager changes. When the failover runner finds that the primary is
* in good shape, it will switch back.
*
**/
public class RemoteCacheRestore implements ICacheRestore {
private Logger log = LoggerManager.getLogger(this);
private boolean debug = true;
private final RemoteCacheManager rcm;
//private final IAuxiliaryCacheManager rcm;
private boolean canFix = true;
private Object remoteObj;
/** Constructs with the given instance of RemoteCacheManager. */
public RemoteCacheRestore(RemoteCacheManager rcm) {
//public RemoteCacheRestore(IAuxiliaryCacheManager rcm) {
this.rcm = rcm;
}
/**
* Returns true if the connection to the remote host for the corresponding cache
manager
* can be successfully re-established.
*/
public boolean canFix() {
if (!canFix)
return canFix;
String registry = "//" + rcm.host + ":" + rcm.port + "/" + rcm.service;
log.info("looking up server " + registry);
try {
remoteObj = Naming.lookup(registry);
log.info("looking up server " + registry);
} catch(Exception ex) {
//log.error(ex, "host=" + rcm.host + "; port" + rcm.port + "; service=" +
rcm.service );
log.error("host=" + rcm.host + "; port" + rcm.port + "; service=" +
rcm.service );
canFix = false;
}
return canFix;
}
/** Fixes up all the caches managed by the associated cache manager. */
public void fix() {
if (!canFix) {
return;
}
rcm.fixCaches((IRemoteCacheService)remoteObj, (IRemoteCacheObserver)remoteObj);
String msg = "Remote connection to " + "//" + rcm.host + ":" + rcm.port + "/" +
rcm.service + " resumed.";
log.info(msg);
p(msg);
}
private void p(String s) {
System.out.println("RemoteCacheRestore:"+s);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteCacheWatchRepairable.java
Index: RemoteCacheWatchRepairable.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
/** Same as CacheWatcherWrapper but implements the IRemoteCacheWatch interface. */
public class RemoteCacheWatchRepairable extends CacheWatchRepairable implements
IRemoteCacheObserver {
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/RemoteUtils.java
Index: RemoteUtils.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
import java.rmi.*;
import java.rmi.registry.*;
import java.rmi.server.*;
import java.io.*;
import java.util.*;
public class RemoteUtils {
private static boolean debug = false;
private RemoteUtils () {
}
/**
* Creates and exports a registry on the specified port of the local host.
*/
public static int createRegistry (int port) throws RemoteException {
p("createRegistry> setting security manager");
System.setSecurityManager(new RMISecurityManager());
if (port < 1024) {
port = Registry.REGISTRY_PORT;
}
p("createRegistry> creating registry");
LocateRegistry.createRegistry(port);
return port;
}
public static Properties loadProps (String propFile) throws IOException {
InputStream is = RemoteUtils.class.getResourceAsStream(propFile);
Properties props = new Properties();
try {
props.load(is);
if (debug) {
p("props.size=" + props.size() + ", " + props);
}
} catch (Exception ex) {
// ignore;
} finally {
if (is != null) {
is.close();
}
}
return props;
}
private static void p (String s) {
System.out.println("RmiUitls: " + s);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/ZombieRemoteCacheService.java
Index: ZombieRemoteCacheService.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
import java.io.*;
import org.apache.stratum.jcs.engine.behavior.ICacheService;
import org.apache.stratum.jcs.engine.behavior.ICacheElement;
import org.apache.stratum.jcs.engine.behavior.ICache;
import org.apache.stratum.jcs.utils.reuse.*;
import org.apache.stratum.jcs.engine.ZombieCacheService;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
public class ZombieRemoteCacheService extends ZombieCacheService implements
IRemoteCacheService {
public void update(ICacheElement item, byte listenerId) {}
public void remove(String cacheName, Serializable key, byte listenerId) {}
public void removeAll(String cacheName, byte listenerId) {}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/remote/ZombieRemoteCacheWatch.java
Index: ZombieRemoteCacheWatch.java
===================================================================
package org.apache.stratum.jcs.auxiliary.remote;
import org.apache.stratum.jcs.engine.ZombieCacheWatch;
import org.apache.stratum.jcs.auxiliary.remote.behavior.*;
public class ZombieRemoteCacheWatch extends ZombieCacheWatch implements
IRemoteCacheObserver {
}
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>