asmuts 02/01/14 22:50:44
Added: src/java/org/apache/stratum/jcs/auxiliary/lateral
LateralCache.java LateralCacheAttributes.java
LateralCacheFactory.java LateralCacheInfo.java
LateralCacheManager.java LateralCacheMonitor.java
LateralCacheNoWait.java
LateralCacheNoWaitFacade.java
LateralCacheRestore.java
LateralCacheWatchRepairable.java
LateralElementDescriptor.java
ZombieLateralCacheService.java
ZombieLateralCacheWatch.java
Log:
no message
Revision Changes Path
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCache.java
Index: LateralCache.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral;
import java.io.*;
import java.util.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
import org.apache.stratum.jcs.auxiliary.lateral.http.broadcast.*;
import org.apache.stratum.jcs.auxiliary.lateral.http.remove.*;
import org.apache.stratum.jcs.utils.log.*;
import org.apache.stratum.jcs.utils.reuse.*;
/**
* Lateral distributor. Returns null on get. Net search not implemented.
*
*/
public class LateralCache implements ICache {
private static boolean debug = false; //true;
private static Logger log;
private static int numCreated = 0;
Attributes attr = null;
private HashMap keyHash; // not synchronized to maximize concurrency.
// generalize this, use another interface
ILateralCacheAttributes cattr;
final String cacheName;
private String source_id = "org.apache.stratum.jcs.auxiliary.lateral.LateralCache";
/**
* either http, socket.udp, or socket.tcp
* can set in config
*/
private ILateralCacheService lateral;
///////////////////////////////////////////////////
public Serializable getSourceId() {
return this.source_id;
}
////////////////////////////////////////////////////////
protected LateralCache(String cacheName, ILateralCacheService lateral) {
this.cacheName= cacheName;
this.lateral = lateral;
log = LoggerManager.getLogger( this );
if (debug) {
p("Construct> cacheName=" + cacheName + ", lateral = " + lateral);
}
}
////////////////////////////////////////////////////////
protected LateralCache( ILateralCacheAttributes cattr ) {
this.cacheName= cattr.getCacheName();
//this.servers = servers;
this.cattr = cattr;
log = LoggerManager.getLogger( this );
if (debug) {
p("Construct> cacheName=" + cattr.getCacheName() );
}
}
////////////////////////////////////////
public String toString() {
return "LateralCache: " + cattr.getCacheName();
}
/**
* Synchronously put to the lateral cache; if failed, replace the remote
* handle with a zombie.
*/
public void put(Serializable key, Serializable value ) throws IOException {
put( key, value, (Attributes)this.attr.copy() );
}
public void put(Serializable key, Serializable value, Attributes attr ) throws
IOException {
try {
CacheElement ce = new CacheElement( cattr.getCacheName(), key, value);
ce.setAttributes( attr );
update( ce );
} catch(Exception ex) {
handleException(ex, "Failed to put " + key + " to " +cattr.getCacheName());
}
}
public void update( ICacheElement ce ) throws IOException {
try {
if ( debug ) {
p( "lateral = " + lateral );
//p( "ce = " + ce );
p( "LateralCacheInfo.listenerId = " + LateralCacheInfo.listenerId );
}
lateral.update( ce, LateralCacheInfo.listenerId );
} catch ( NullPointerException npe ) {
//log.error( npe, "npe for ce = " + ce + "ce.attr = " + ce.getAttributes() );
log.error( npe );
return;
} catch(Exception ex) {
handleException(ex, "Failed to put " + ce.getKey() + " to " +
ce.getCacheName());
}
} // end update
/**
* Return null. The performace costs are too great. Can implement later.
*/
public Serializable get(Serializable key) throws IOException {
return null;
//try {
//} catch ( Exception e ) {
// log.debug( "didn't find element " + key + "" );
// return null;
//}
}
/**
*
*/
public Serializable get(Serializable key, boolean container) throws IOException {
try {
return null;
} catch(Exception ex) {
handleException(ex, "Failed to get " + key + " from " +cattr.getCacheName());
return null; // never executes; just keep the compiler happy.
}
}
/**
* Synchronously remove from the remote cache; if failed, replace the remote
* handle with a zombie.
*/
public boolean remove(Serializable key) throws IOException {
if (debug) {
p("remove> key="+key);
}
try {
//DeleteLateralCacheMulticaster dlcm = new DeleteLateralCacheMulticaster(
cattr.getCacheName(), (String)key, cattr.getLateralCacheAddrs(),
cattr.getLateralDeleteServlet() );
//dlcm.multicast();
lateral.remove( cacheName, key, LateralCacheInfo.listenerId );
} catch(Exception ex) {
handleException(ex, "Failed to remove " + key + " from " +
cattr.getCacheName());
}
return false;
}
/**
* Synchronously removeAll from the remote cache; if failed, replace the remote
* handle with a zombie.
*/
public void removeAll() throws IOException {
try {
//DeleteLateralCacheMulticaster dlcm = new DeleteLateralCacheMulticaster(
cattr.getCacheName(), "ALL", cattr.getLateralCacheAddrs(),
cattr.getLateralDeleteServlet() );
//dlcm.multicast();
lateral.removeAll( cacheName, LateralCacheInfo.listenerId );
} catch(Exception ex) {
handleException(ex, "Failed to remove all from " + cattr.getCacheName());
}
}
/**
* Synchronously dispose the cache.
*/
public void dispose() throws IOException {
p( "disposing of lateral cache" );
try {
// Should remove cache from multicast group
} catch(Exception ex) {
p( "couldn't dispose" );
handleException(ex, "Failed to dispose " +cattr.getCacheName());
//remote = null;
}
}
/////////////////////////////////////////////////////////
public String getStats(){
return "cacheName = " +cattr.getCacheName();
}
/**
* Returns the cache status.
*/
public int getStatus() {
return this.lateral instanceof IZombie ? STATUS_ERROR : STATUS_ALIVE;
}
/** Returns the current cache size. */
public int getSize() {
return 0;
}
public int getCacheType() {
return ICacheType.LATERAL_CACHE;
}
public String getCacheName() {
return cacheName;
}
/**
* Not yet sure what to do here.
*/
private void handleException(Exception ex, String msg) throws IOException {
log.error( "Disabling lateral cache due to error " + msg);
log.error(ex);
lateral = new ZombieLateralCacheService();
// may want to flush if region specifies
// Notify the cache monitor about the error, and kick off the recovery process.
LateralCacheMonitor.getInstance().notifyError();
// could stop the net serach if it is built and try to reconnect?
if (ex instanceof IOException) {
throw (IOException)ex;
}
throw new IOException(ex.getMessage());
}
/** Replaces the current remote cache service handle with the given handle. */
public void fixCache(ILateralCacheService lateral) {
this.lateral = lateral;
return;
}
/////////////////////////////////////////////////
private void p (String s) {
//log.debug("LateralCache:" + s);
System.out.println("LateralCache:" + s);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheAttributes.java
Index: LateralCacheAttributes.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral;
import java.io.*;
import java.util.*;
//import org.apache.stratum.jcs.auxiliary.*;
import org.apache.stratum.jcs.auxiliary.behavior.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
import org.apache.stratum.jcs.auxiliary.lateral.http.broadcast.*;
import org.apache.stratum.jcs.auxiliary.lateral.http.remove.*;
import org.apache.stratum.jcs.utils.log.*;
import org.apache.stratum.jcs.utils.reuse.*;
///////////////////////////////////////////////////////////
public class LateralCacheAttributes implements Serializable, ILateralCacheAttributes
{
String transmissionTypeName = "UDP";
int transmissionType = UDP;
ArrayList httpServers;
// used to identify the service that this manager will be
// operating on
String httpServer = "";
String httpReceiveServlet = "";
String httpDeleteServlet = "";
String udpMulticastAddr = "228.5.6.7";
int udpMulticastPort = 6789;
//ArrayList tcpServers;
String tcpServers;
// used to identify the service that this manager will be
// operating on
String tcpServer = "";
int tcpListenerPort = 1111;
private String cacheName;
private String name;
/////////////////////////////////////////
public void setHttpServer( String val ) {
httpServer = val;
}
public String getHttpServer( ) {
return httpServer;
}
/*
/////////////////////////////////////////
public void setTcpServers( ArrayList val ) {
tcpServers = val;
}
public ArrayList getTcpServers( ) {
return tcpServers;
}
*/
/////////////////////////////////////////
public void setTcpServers( String val ) {
tcpServers = val;
}
public String getTcpServers( ) {
return tcpServers;
}
/////////////////////////////////////////
public void setTcpServer( String val ) {
tcpServer = val;
}
public String getTcpServer( ) {
return tcpServer;
}
/////////////////////////////////////////
public void setTcpListenerPort( int val ) {
this.tcpListenerPort = val;
}
public int getTcpListenerPort( ) {
return this.tcpListenerPort;
}
/////////////////////////////////////////
public void setUdpMulticastAddr( String val ) {
udpMulticastAddr = val;
}
public String getUdpMulticastAddr( ) {
return udpMulticastAddr;
}
/////////////////////////////////////////
public void setUdpMulticastPort( int val ) {
udpMulticastPort = val;
}
public int getUdpMulticastPort( ) {
return udpMulticastPort;
}
/////////////////////////////////////////
public void setTransmissionType( int val ) {
this.transmissionType = val;
if ( val == UDP ) {
transmissionTypeName = "UDP";
} else
if ( val == HTTP ) {
transmissionTypeName = "HTTP";
} else
if ( val == TCP ) {
transmissionTypeName = "TCP";
}
}
public int getTransmissionType() {
return this.transmissionType;
}
/////////////////////////////////////////
public void setTransmissionTypeName( String val ) {
this.transmissionTypeName = val;
if ( val.equals("UDP") ) {
transmissionType = UDP;
} else
if ( val.equals("HTTP") ) {
transmissionType = HTTP;
} else
if ( val.equals("TCP") ) {
transmissionType = TCP;
}
}
/////////////////////////////////////////
public String getTransmissionTypeName() {
return this.transmissionTypeName;
}
////////////////////////////////////////////////////
public void setCacheName( String s ) {
this.cacheName = s;
}
public String getCacheName( ) {
return this.cacheName;
}
/////////////////////////////////////////////////////////////////////
public String getName() {
return this.name;
}
public void setName( String name ) {
this.name = name;
}
/////////////////////////////////////////////////
public IAuxiliaryCacheAttributes copy() {
try {
return (IAuxiliaryCacheAttributes)this.clone();
} catch( Exception e ){}
return (IAuxiliaryCacheAttributes)this;
}
////////////////////////////////////////////////
public String toString() {
StringBuffer buf = new StringBuffer();
buf.append( "cacheName=" + cacheName + "\n" );
buf.append( "transmissionTypeName=" + transmissionTypeName + "\n" );
buf.append( "transmissionType=" + transmissionType + "\n" );
buf.append( "tcpServer=" + tcpServer + "\n" );
buf.append( httpServer + udpMulticastAddr + String.valueOf( udpMulticastPort ) +
tcpServer );
return buf.toString();
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheFactory.java
Index: LateralCacheFactory.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral;
/**
* Title:
* Description:
* Copyright: Copyright (c) 2001
* Company:
* @author
* @version 1.0
*/
import java.util.*;
//import org.apache.stratum.jcs.auxiliary.*;
import org.apache.stratum.jcs.auxiliary.behavior.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
/**
* Constructs a LateralCacheNoWaitFacade for the given configuration.
* Each lateral service / local relationship is managed by one manager.
* This manager canl have multiple caches. The remote relationships are
consolidated
* and restored via these managers. The facade provides a front to the composite
cache
* so the implmenetation is transparent.
*/
public class LateralCacheFactory implements IAuxiliaryCacheFactory {
private static Logger log = LoggerManager.getLogger( LateralCacheFactory.class );
private static String name;
///////////////////////////////////////////////////////////////
/**
* Interface method. Allows classforname construction, making caches pluggable.
*/
public ICache createCache( IAuxiliaryCacheAttributes iaca ) {
LateralCacheAttributes lac = (LateralCacheAttributes)iaca;
ArrayList noWaits = new ArrayList();
if ( lac.getTransmissionType() == lac.UDP ) {
LateralCacheManager lcm = LateralCacheManager.getInstance( lac );
ICache ic = lcm.getCache( lac.getCacheName() );
if ( ic != null ) {
noWaits.add( ic );
}
} else
if ( lac.getTransmissionType() == lac.TCP ) {
//pars up the tcp servers and set the tcpServer value and
// get the manager and then get the cache
//Iterator it = lac.tcpServers.iterator();
//while( it.hasNext() ) {
StringTokenizer it = new StringTokenizer( lac.tcpServers, "," );
while( it.hasMoreElements() ) {
//String server = (String)it.next();
String server = (String)it.nextElement();
//p( "tcp server = " + server );
lac.setTcpServer( server );
LateralCacheManager lcm = LateralCacheManager.getInstance( lac );
ICache ic = lcm.getCache( lac.getCacheName() );
if ( ic != null ) {
noWaits.add( ic );
} else {
//p( "noWait is null" );
}
}
} else
if ( lac.getTransmissionType() == lac.HTTP ) {
Iterator it = lac.httpServers.iterator();
while( it.hasNext() ) {
String server = (String)it.next();
lac.setHttpServer( server );
LateralCacheManager lcm = LateralCacheManager.getInstance( lac );
ICache ic = lcm.getCache( lac.getCacheName() );
if ( ic != null ) {
noWaits.add( ic );
}
}
}
LateralCacheNoWaitFacade lcnwf = new LateralCacheNoWaitFacade(
(LateralCacheNoWait[])noWaits.toArray(new LateralCacheNoWait[0]), iaca.getCacheName()
);
return lcnwf;
} // end createCache
/////////////////////////////////////////////////////////////////////
public String getName() {
return this.name;
}
public void setName( String name ) {
this.name = name;
}
///////////////////////////////////////////////
private static void p( String s ) {
System.out.println( "LateralCacheFactory: " + s );
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheInfo.java
Index: LateralCacheInfo.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral;
import java.rmi.dgc.VMID;
/** A shared static variable holder for the lateral cache */
public class LateralCacheInfo {
// shouldn't be instantiated
private LateralCacheInfo(){}
/** Shouldn't be used till after reconneting, after setting = thread safe
* Used to identify a client, so we can run multiple clients off one host.
* Need since there is no way to identify a client other than by host in rmi.
*/
protected static VMID vmid = new VMID();
public static byte listenerId = (byte)vmid.hashCode();
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheManager.java
Index: LateralCacheManager.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral;
import java.io.*;
import java.net.*;
import java.util.*;
//import org.apache.stratum.jcs.auxiliary.*;
import org.apache.stratum.jcs.auxiliary.behavior.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
//import org.apache.stratum.jcs.auxiliary.lateral.http.*;
import org.apache.stratum.jcs.auxiliary.lateral.socket.udp.*;
import org.apache.stratum.jcs.auxiliary.lateral.socket.tcp.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
/**
* Creates lateral caches. Lateral caches are primarily used for
* removing non laterally configured caches. Non laterally configured
* cache regions should still bea ble to participate in removal. But
* if there is a non laterally configured cache hub, then lateral removals may
* be necessary. For flat webserver production environments, without
* a strong machine at the app server level, distribution and search may
* need to occur at the lateral cache level. This is currently not implemented
* in the lateral cache.
*/
public class LateralCacheManager implements IAuxiliaryCacheManager {
private static boolean debug = true; //false;
//static ArrayList defaultServers;
ICacheAttributes defaultCattr;
private static Logger log;
private static LateralCacheMonitor monitor;
static final Map instances = new HashMap();
// each manager instance has caches
final Map caches = new HashMap();
protected ILateralCacheAttributes lca;
private int clients;
/**
* Handle to the lateral cache service; or a zombie handle if failed to connect.
*/
private ILateralCacheService lateralService;
/**
* Wrapper of the lateral cache watch service;
* or wrapper of a zombie service if failed to connect.
*/
private LateralCacheWatchRepairable lateralWatch;
/////////////////////////////////////////////////
public static LateralCacheManager getInstance ( ILateralCacheAttributes lca ) {
if ( debug ) {
p( "getting instance" );
}
LateralCacheManager ins = (LateralCacheManager)instances.get(lca.toString());
if (ins == null) {
synchronized (instances) {
ins = (LateralCacheManager)instances.get(lca.toString());
if (ins == null) {
ins = new LateralCacheManager( lca );
instances.put(lca.toString(), ins);
}
}
} else {
if ( debug ) {
ins.p( "found manager already created" );
}
}
if (debug) {
ins.log.logIt("Manager stats : " + ins.getStats() + "<br> -- in
getInstance()");
}
ins.clients++;
// Fires up the monitoring daemon.
if (monitor == null) {
monitor = LateralCacheMonitor.getInstance();
// If the returned monitor is null, it means it's already started elsewhere.
if (monitor != null) {
Thread t = new Thread(monitor);
t.setDaemon(true);
t.start();
}
}
return ins;
} // end getInstance
////////////////////////////////////////////////////////
private LateralCacheManager( ILateralCacheAttributes lca ) {
this.lca = lca;
log = LoggerManager.getLogger( this );
if ( debug ) {
p( "creating lateral cache service lca = " + lca );
}
// need to create the service based on the type
// ex
try {
if ( lca.getTransmissionType() == lca.UDP ) {
// need to allow for this to be a service.
// should wrap sender and new kind of receiver?
if ( debug ) {
p( "udp service" );
}
this.lateralService = new LateralUDPService( lca );
} else
if ( lca.getTransmissionType() == lca.HTTP ) {
if ( debug ) {
p( "http service" );
}
//this.lateralService = new LateralHTTPService( lca );
} else
if ( lca.getTransmissionType() == lca.TCP ) {
if ( debug ) {
p( "tcp service" );
}
this.lateralService = new LateralTCPService( lca );
} else {
log.error( "type not recognized, must zombie" );
throw new Exception( "no known transmission type for lateral cache." );
}
if ( this.lateralService == null ) {
log.error( "no service created?, must zombie" );
throw new Exception( "no service created for lateral cache." );
}
lateralWatch = new LateralCacheWatchRepairable();
lateralWatch.setCacheWatch(new ZombieLateralCacheWatch());
} catch (Exception ex) {
// Failed to connect to the lateral server.
// Configure this LateralCacheManager instance to use the "zombie" services.
log.error(ex.toString() + "**********************ZOMBIZING LATERAL SERVICE");
lateralService = new ZombieLateralCacheService();
lateralWatch = new LateralCacheWatchRepairable();
lateralWatch.setCacheWatch(new ZombieLateralCacheWatch());
// Notify the cache monitor about the error, and kick off the recovery process.
LateralCacheMonitor.getInstance().notifyError();
}
} // end constructor
/** Adds the lateral cache listener to the underlying cache-watch service. */
public void addLateralCacheListener(String cacheName, ILateralCacheListener
listener) throws IOException {
synchronized (caches) {
lateralWatch.addCacheListener(cacheName, listener);
}
return;
}
/** Called to access a precreated region or construct one with defaults.
* Since all aux cache access goes through the manager, this will never be called.
*/
public ICache getCache ( String cacheName ) {
LateralCacheNoWait c = null;
synchronized (caches) {
//c = (LateralCache)caches.get(cacheName);
c = (LateralCacheNoWait)caches.get(cacheName);
if (c == null) {
c = new LateralCacheNoWait(new LateralCache(cacheName, lateralService));
caches.put(cacheName, c);
}
}
try {
// need to set listener based on transmissionType
if ( lca.getTransmissionType() == lca.UDP ) {
addLateralCacheListener( cacheName,
LateralGroupCacheUDPListener.getInstance(lca) );
} else
if ( lca.getTransmissionType() == lca.TCP ) {
addLateralCacheListener( cacheName,
LateralGroupCacheTCPListener.getInstance(lca) );
}
} catch( IOException ioe ) {
log.error( ioe );
} catch( Exception e ) {
log.error( e );
}
if (log.logLevel >= log.DEBUG) {
//log.debug("LateralManager stats : " + getStats());
}
return c;
}
//////////////////////////////////////////////
public int getCacheType() {
return LATERAL_CACHE;
}
//////////////////////////////////////
public String getStats(){
// add something here
return "";
}
/** Fixes up all the caches managed by this cache manager. */
public void fixCaches (ILateralCacheService lateralService, ILateralCacheObserver
lateralWatch) {
p( "*******FIXING LATERAL CACHE*********" );
synchronized (caches) {
this.lateralService = lateralService;
// need to implment an observer for some types of laterals( http and tcp)
//this.lateralWatch.setCacheWatch(lateralWatch);
for (Iterator en = caches.values().iterator(); en.hasNext();) {
LateralCacheNoWait cache = (LateralCacheNoWait)en.next();
cache.fixCache(this.lateralService);
}
}
}
/////////////////////////////////////////////////
private static void p (String s) {
System.out.println("LateralCacheManager:" + s);
}
////////////////////////////////////////////////
// need freeCache, release, getStats
// need to find an interface acceptible for all
// cache managers or a manager within a type
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheMonitor.java
Index: LateralCacheMonitor.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral;
import org.apache.stratum.jcs.utils.log.*;
import java.util.*;
/**
* Used to monitor and repair any failed connection for the lateral cache service.
* By default the monitor operates in a failure driven mode. That is, it goes into
a wait
* state until there is an error.
* Upon the notification of a connection error, the monitor changes to operate
* in a time driven mode. That is, it attempts to recover the connections on a
periodic basis.
* When all failed connections are restored, it changes back to the failure driven
mode.
*/
public class LateralCacheMonitor implements Runnable {
protected static final boolean debug = true;//false;
private static LateralCacheMonitor instance;
private static long idlePeriod = 20*1000; // minimum 20 seconds.
//private static long idlePeriod = 3*1000; // for debugging.
private transient Logger log = LoggerManager.getInstance().getLogger(this);
// Must make sure LateralCacheMonitor is started before any lateral error can be
detected!
private boolean alright=true;
/** Configures the idle period between repairs. */
public static void setIdlePeriod(long idlePeriod) {
if (idlePeriod > LateralCacheMonitor.idlePeriod) {
LateralCacheMonitor.idlePeriod = idlePeriod;
}
}
private LateralCacheMonitor() {}
/**
* Returns the singleton instance;
*/
static LateralCacheMonitor getInstance() {
if (instance == null) {
synchronized(LateralCacheMonitor.class) {
if (instance == null) {
return instance=new LateralCacheMonitor();
}
}
}
return instance;
}
/**
* Notifies the cache monitor that an error occurred,
* and kicks off the error recovery process.
*/
public void notifyError() {
bad();
synchronized(this) {
notify();
}
}
// Run forever.
// Avoid the use of any synchronization in the process of monitoring for
performance reason.
// If exception is thrown owing to synchronization,
// just skip the monitoring until the next round.
public void run () {
do {
if (alright) {
synchronized(this) {
if (alright) {
// Failure driven mode.
try {
wait(); // wake up only if there is an error.
} catch(InterruptedException ignore) {
}
}
}
}
// Time driven mode: sleep between each round of recovery attempt.
try {
if ( debug ) {
p("cache monitor sleeping for " + idlePeriod);
}
Thread.currentThread().sleep(idlePeriod);
} catch (InterruptedException ex) {
// ignore;
}
// The "alright" flag must be false here.
// Simply presume we can fix all the errors until proven otherwise.
synchronized(this) {
alright = true;
}
if ( debug ) {
p("cache monitor running.");
}
// Monitor each LateralCacheManager instance one after the other.
// Each LateralCacheManager corresponds to one lateral connection.
for (Iterator itr = LateralCacheManager.instances.values().iterator();
itr.hasNext();) {
LateralCacheManager mgr = (LateralCacheManager)itr.next();
try {
// If any cache is in error, it strongly suggests all caches managed by the
// same LateralCacheManager instance are in error. So we fix them once
and for all.
for (Iterator itr2 = mgr.caches.values().iterator(); itr2.hasNext();) {
if (itr2.hasNext()) {
LateralCacheNoWait c = (LateralCacheNoWait)itr2.next();
if (c.getStatus() == c.STATUS_ERROR) {
if ( debug ) {
p( "found LateralCacheNoWait in error" );
}
LateralCacheRestore repairer = new LateralCacheRestore(mgr);
// If we can't fix them, just skip and re-try in the next round.
if (repairer.canFix()) {
repairer.fix();
}
else {
bad();
}
break;
} else {
if ( debug ) {
p( "lcnw not in error" );
}
}
}
}
} catch (Exception ex) {
bad();
// Problem encountered in fixing the caches managed by a
LateralCacheManager instance.
// Soldier on to the next LateralCacheManager instance.
log.error(ex);
}
}
} while (true);
}
/** Sets the "alright" flag to false in a critial section. */
private void bad() {
if (alright) {
synchronized(this) {
alright = false;
}
}
}
private void p(String s) {
System.out.println("LateralCacheMonitor:" + s);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheNoWait.java
Index: LateralCacheNoWait.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral;
import org.apache.stratum.jcs.engine.control.Cache;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
import org.apache.stratum.jcs.auxiliary.lateral.http.broadcast.*;
import org.apache.stratum.jcs.auxiliary.lateral.socket.udp.*;
//import org.apache.stratum.jcs.auxiliary.lateral.socket.tcp.*;
import org.apache.stratum.jcs.utils.log.*;
import java.io.*;
import java.rmi.*;
/**
* Used to queue up update requests to the underlying cache.
* These requests will be processed in their order of arrival
* via the cache event queue processor.
*/
public class LateralCacheNoWait implements ICache {
private final LateralCache cache;
private ICacheEventQueue q;
private transient Logger log = LoggerManager.getInstance().getLogger(this);
private String source_id =
"org.apache.stratum.jcs.auxiliary.lateral.LateralCacheNoWait";
///////////////////////////////////////////////////
public Serializable getSourceId() {
return this.source_id;
}
/**
* Constructs with the given lateral cache,
* and fires up an event queue for aysnchronous processing.
*/
public LateralCacheNoWait(LateralCache cache) {
this.cache = cache;
this.q = new CacheEventQueue(new CacheAdaptor(cache),
LateralCacheInfo.listenerId, cache.getCacheName());
// need each no wait to handle each of its real updates and removes, since
there may
// be more than one per cache? alternativve is to have the cache
// perform updates using a different method that spcifies the listener
//this.q = new CacheEventQueue(new CacheAdaptor(this),
LateralCacheInfo.listenerId, cache.getCacheName());
if (cache.getStatus() == cache.STATUS_ERROR)
q.destroy();
}
/** Adds a put request to the lateral cache. */
public void put(Serializable key, Serializable value ) throws IOException {
put( key, value, null );
}
public void put(Serializable key, Serializable value, Attributes attr ) throws
IOException {
try {
CacheElement ce = new CacheElement( cache.getCacheName(), key, value );
ce.setAttributes( attr );
update( ce );
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
}
public void update( ICacheElement ce ) throws IOException {
try {
q.addPutEvent( ce );
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
}
/** Synchronously reads from the lateral cache. */
public Serializable get(Serializable key) {
return get( key, true );
}
public Serializable get(Serializable key, boolean container ) {
try {
return cache.get(key);
} catch(UnmarshalException ue) {
p("Retrying the get owing to UnmarshalException...");
try {
return cache.get(key);
} catch(IOException ex) {
p("Failed in retrying the get for the second time.");
q.destroy();
}
} catch(IOException ex) {
q.destroy();
}
return null;
}
/** Adds a remove request to the lateral cache. */
public boolean remove(Serializable key) {
try {
q.addRemoveEvent(key);
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
return false;
}
/** Adds a removeAll request to the lateral cache. */
public void removeAll() {
try {
q.addRemoveAllEvent();
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
}
/** Adds a dispose request to the lateral cache. */
public void dispose() {
try {
q.addDisposeEvent();
} catch(IOException ex) {
log.error(ex);
q.destroy();
}
}
/** No lateral invokation. */
public String getStats() {
return cache.getStats();
}
/** No lateral invokation. */
public int getSize() {
return cache.getSize();
}
/** No lateral invokation. */
public int getCacheType() {
return cache.getCacheType();
}
/**
* Returns the asyn cache status.
* An error status indicates either the lateral connection is not available,
* or the asyn queue has been unexpectedly destroyed.
* No lateral invokation.
*/
public int getStatus() {
return q.isAlive() ? cache.getStatus() : cache.STATUS_ERROR;
}
public String getCacheName() {
return cache.getCacheName();
}
/**
* Replaces the lateral cache service handle with the given handle and
* reset the event queue by starting up a new instance.
*/
public void fixCache(ILateralCacheService lateral) {
cache.fixCache(lateral);
resetEventQ();
return;
}
/** Resets the event q by first destroying the existing one and starting up new
one. */
public void resetEventQ() {
if (q.isAlive()) {
q.destroy();
}
this.q = new CacheEventQueue(new CacheAdaptor(cache),
LateralCacheInfo.listenerId, cache.getCacheName());
}
public String toString() {
return "LateralCacheNoWait: " + cache.toString();
}
private void p(String s) {
System.out.println("LateralCacheNoWait:" + s);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java
Index: LateralCacheNoWaitFacade.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral;
import org.apache.stratum.jcs.engine.control.Cache;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.lateral.http.broadcast.*;
import org.apache.stratum.jcs.auxiliary.lateral.socket.udp.*;
//import org.apache.stratum.jcs.auxiliary.lateral.socket.tcp.*;
import org.apache.stratum.jcs.utils.log.*;
import java.io.*;
import java.rmi.*;
/**
* Used to provide access to multiple services under nowait protection.
* Composite factory should construct LateralCacheNoWaitFacade to give
* to the composite cache out of caches it constructs from the varies
* manager to lateral services. Perhaps the lateralcache factory
* should be able to do this.
*/
public class LateralCacheNoWaitFacade implements ICache {
private static final boolean debug = false;//true;
public LateralCacheNoWait[] noWaits;
private transient Logger log = LoggerManager.getInstance().getLogger(this);
private String source_id =
"org.apache.stratum.jcs.auxiliary.lateral.LateralCacheNoWaitFacade";
private String cacheName;
///////////////////////////////////////////////////
public Serializable getSourceId() {
return this.source_id;
}
/**
* Constructs with the given lateral cache,
* and fires events to any listeners.
*/
public LateralCacheNoWaitFacade(LateralCacheNoWait[] noWaits, String cacheName) {
this.noWaits = noWaits;
this.cacheName = cacheName;
}
/** Adds a put request to the lateral cache. */
public void put(Serializable key, Serializable value ) throws IOException {
put( key, value, null );
}
public void put(Serializable key, Serializable value, Attributes attr ) throws
IOException {
try {
CacheElement ce = new CacheElement( cacheName, key, value );
ce.setAttributes( attr );
update( ce );
} catch(Exception ex) {
log.error(ex);
}
}
public void update( ICacheElement ce ) throws IOException {
if ( debug ) {
p( "updating through lateral cache facade, noWaits.length = " + noWaits.length
);
}
try {
for( int i=0; i < noWaits.length; i++ ) {
noWaits[i].update( ce );
}
} catch(Exception ex) {
log.error(ex);
}
}
/** Synchronously reads from the lateral cache. */
public Serializable get(Serializable key) {
return get( key, true );
}
public Serializable get(Serializable key, boolean container ) {
for( int i=0; i < noWaits.length; i++ ) {
try {
Object obj = noWaits[i].get( key, container );
if ( obj != null ) {
return (Serializable)obj;
}
} catch(Exception ex) {
p("Failed to get.");
}
return null;
}
return null;
}
/** Adds a remove request to the lateral cache. */
public boolean remove(Serializable key) {
try {
for( int i=0; i < noWaits.length; i++ ) {
noWaits[i].remove( key );
}
} catch(Exception ex) {
log.error(ex);
}
return false;
}
/** Adds a removeAll request to the lateral cache. */
public void removeAll() {
try {
for( int i=0; i < noWaits.length; i++ ) {
noWaits[i].removeAll();
}
} catch(Exception ex) {
log.error(ex);
}
}
/** Adds a dispose request to the lateral cache. */
public void dispose() {
try {
for( int i=0; i < noWaits.length; i++ ) {
noWaits[i].dispose();
}
} catch(Exception ex) {
log.error(ex);
}
}
/** No lateral invokation. */
public String getStats() {
return "";//cache.getStats();
}
/** No lateral invokation. */
public int getSize() {
return 0; //cache.getSize();
}
/////////////////////////////////////////////
public int getCacheType() {
return ICacheType.LATERAL_CACHE;
}
public String getCacheName() {
return ""; //cache.getCacheName();
}
// need to do something with this
public int getStatus() {
return 0;//q.isAlive() ? cache.getStatus() : cache.STATUS_ERROR;
}
////////////////////////////////////////////////////////
public String toString() {
return "LateralCacheNoWaitFacade: " + cacheName;
}
private void p(String s) {
System.out.println("LateralCacheNoWaitFacade:" + s);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheRestore.java
Index: LateralCacheRestore.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
import org.apache.stratum.jcs.auxiliary.lateral.socket.udp.*;
import org.apache.stratum.jcs.auxiliary.lateral.socket.tcp.*;
import org.apache.stratum.jcs.utils.log.*;
import java.rmi.*;
import java.rmi.registry.*;
/**
* Used to repair the lateral caches managed by the associated instance of
LateralCacheManager.
*/
public class LateralCacheRestore implements ICacheRestore {
private Logger log = LoggerManager.getLogger(this);
private boolean debug = true;
private final LateralCacheManager lcm;
private boolean canFix = true;
private Object lateralObj;
/** Constructs with the given instance of LateralCacheManager. */
public LateralCacheRestore(LateralCacheManager lcm) {
this.lcm = lcm;
}
/**
* Returns true iff the connection to the lateral host for the corresponding cache
manager
* can be successfully re-established.
*/
public boolean canFix() {
if (!canFix) {
return canFix;
}
try {
// restore based on type. Only the tcp scoket type really needs restoring.
if( lcm.lca.getTransmissionType() == lcm.lca.UDP ) {
lateralObj = new LateralUDPService( lcm.lca );
} else
if( lcm.lca.getTransmissionType() == lcm.lca.TCP ) {
lateralObj = new LateralTCPService( lcm.lca );
} else
if( lcm.lca.getTransmissionType() == lcm.lca.HTTP ) {
}
} catch(Exception ex) {
log.error(ex.getMessage());
canFix = false;
}
return canFix;
}
/** Fixes up all the caches managed by the associated cache manager. */
public void fix() {
if (!canFix) {
return;
}
lcm.fixCaches((ILateralCacheService)lateralObj,
(ILateralCacheObserver)lateralObj);
String msg = "Lateral connection resumed.";
log.info(msg);
p(msg);
}
////////////////////////////////////////////////////
private void p(String s) {
System.out.println("LateralCacheRestore:"+s);
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralCacheWatchRepairable.java
Index: LateralCacheWatchRepairable.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
/** Same as CacheWatcherWrapper but implements the IRemoteCacheWatch interface. */
public class LateralCacheWatchRepairable extends CacheWatchRepairable implements
ILateralCacheObserver {
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/LateralElementDescriptor.java
Index: LateralElementDescriptor.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import java.io.*;
import java.util.*;
import java.sql.*;
////////////////////////////////////////////////////////////////////
public class LateralElementDescriptor implements Serializable {
// command types
public static final int UPDATE = 1;
public static final int REMOVE = 2;
public static final int REMOVEALL = 3;
public static final int DISPOSE = 4;
public ICacheElement ce;
public byte requesterId;
public int command = UPDATE;
/////////////////////////////////////
// for update command
public LateralElementDescriptor( ) {
}
/////////////////////////////////////
public LateralElementDescriptor( ICacheElement ce ) {
this.ce = ce;
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/ZombieLateralCacheService.java
Index: ZombieLateralCacheService.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral;
import java.io.*;
import org.apache.stratum.jcs.engine.behavior.ICacheService;
import org.apache.stratum.jcs.engine.behavior.ICacheElement;
import org.apache.stratum.jcs.engine.behavior.ICache;
import org.apache.stratum.jcs.utils.reuse.*;
import org.apache.stratum.jcs.engine.ZombieCacheService;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.auxiliary.behavior.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
public class ZombieLateralCacheService extends ZombieCacheService implements
ILateralCacheService {
public void update(ICacheElement item, byte listenerId) {}
public void remove(String cacheName, Serializable key, byte listenerId) {}
public void removeAll(String cacheName, byte listenerId) {}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/ZombieLateralCacheWatch.java
Index: ZombieLateralCacheWatch.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral;
import org.apache.stratum.jcs.engine.ZombieCacheWatch;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
public class ZombieLateralCacheWatch extends ZombieCacheWatch implements
ILateralCacheObserver {
}
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>