asmuts 02/01/14 22:26:38
Added: src/java/org/apache/stratum/jcs/auxiliary/lateral/socket/tcp
LateralCacheTCPListener.java
LateralGroupCacheTCPListener.java
LateralTCPReceiver.java
LateralTCPReceiverConnection.java
LateralTCPSender.java LateralTCPService.java
Log:
needs to be brought up to date if it is not
needs work, could be very useful
Revision Changes Path
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/socket/tcp/LateralCacheTCPListener.java
Index: LateralCacheTCPListener.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.socket.tcp;
//////////////////////////////////
import java.io.*;
import java.rmi.*;
import java.rmi.registry.*;
import java.rmi.server.*;
import java.util.*;
import java.sql.*;
/////////////////////////////////////
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.engine.control.*;
import org.apache.stratum.jcs.engine.group.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.*;
import org.apache.stratum.jcs.auxiliary.disk.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
// remove
import org.apache.stratum.jcs.utils.log.*;
///////////////////////////////////////////////////////////////////////
public class LateralCacheTCPListener implements ILateralCacheListener, Serializable {
protected static final boolean debug = false; //true;
protected static final boolean debugcmd = false; //true;
protected static transient Logger log;
protected static transient ICompositeCacheManager cacheMgr;
protected static final HashMap instances = new HashMap();
// instance vars
private LateralTCPReceiver receiver;
private ILateralCacheAttributes ilca;
private boolean inited = false;
/////////////////////////////////////////////////////////////
/**
* Only need one since it does work for all regions,
* just reference by multiple region names.
*/
protected LateralCacheTCPListener ( ILateralCacheAttributes ilca ) {
this.ilca = ilca;
log = LoggerManager.getLogger( LateralCacheTCPListener.class );
}
///////////////////////////////////////////////////
public void init() {
try {
// need to connect based on type
//ILateralCacheListener ilcl = this;
//p( "in init, ilcl = " + ilcl );
receiver = new LateralTCPReceiver( ilca, this );
Thread t = new Thread( receiver );
t.start();
} catch (Exception ex) {
log.error(ex);
throw new IllegalStateException(ex.getMessage());
}
inited = true;
}
/** let the lateral cache set a listener_id.
* Since there is only one listerenr for all the regions
* and every region gets registered? the id shouldn't be set
* if it isn't zero. If it is we assume that it is a reconnect.
*/
public void setListenerId( byte id ) throws IOException {
LateralCacheInfo.listenerId = id;
if ( debugcmd ) {
p( "set listenerId = " + id );
}
}
public byte getListenerId( ) throws IOException {
// set the manager since we are in use
//getCacheManager();
//p( "get listenerId" );
if ( debugcmd ) {
p( "get listenerId = " + LateralCacheInfo.listenerId );
}
return LateralCacheInfo.listenerId;
}
///////////////////////////////////////////////////////////////////////////
public static ILateralCacheListener getInstance( ILateralCacheAttributes ilca ) {
//throws IOException, NotBoundException
ILateralCacheListener ins = (ILateralCacheListener)instances.get(
String.valueOf(ilca.getTcpListenerPort()) );
if (ins == null) {
synchronized(LateralCacheTCPListener.class) {
if (ins == null) {
ins = new LateralCacheTCPListener( ilca );
ins.init();
}
if( debug ) {
p( "created new listener " + ilca.getTcpListenerPort() );
}
instances.put( String.valueOf(ilca.getTcpListenerPort()), ins );
}
}
return ins;
}
/////////////////////////////////////////////////////////////////////////////
//////////////////////////// implements the ILateralCacheListener interface.
//////////////
/**
*/
public void handlePut (ICacheElement cb) throws IOException {
if ( debugcmd ) {
p( "PUTTING ELEMENT FROM LATERAL" );
}
getCacheManager();
ICompositeCache cache = (ICompositeCache)cacheMgr.getCache(cb.getCacheName());
cache.update(cb, false);
//handleRemove(cb.getCacheName(), cb.getKey());
}
////////////////////////////////////////////////////
public void handleRemove (String cacheName, Serializable key) throws IOException {
if (debug) {
log.debug("handleRemove> cacheName=" + cacheName + ", key=" + key);
}
if ( debugcmd ) {
p("handleRemove> cacheName=" + cacheName + ", key=" + key);
}
getCacheManager();
// interface limitation here
ICompositeCache cache = (ICompositeCache)cacheMgr.getCache(cacheName);
cache.remove(key, cache.REMOTE_INVOKATION);
}
//////////////////////////////////////////////////
public void handleRemoveAll (String cacheName) throws IOException {
if (debug) {
log.debug("handleRemoveAll> cacheName=" + cacheName);
}
getCacheManager();
ICache cache = cacheMgr.getCache(cacheName);
cache.removeAll();
}
///////////////////////////////////////////////////
public void handleDispose (String cacheName) throws IOException {
if (debug) {
log.debug("handleDispose> cacheName=" + cacheName);
}
CompositeCacheManager cm = (CompositeCacheManager)cacheMgr;
cm.freeCache(cacheName, Cache.REMOTE_INVOKATION);
}
////////////////////////////////////////////////
// override for new funcitonality
protected void getCacheManager() {
if ( cacheMgr == null ) {
cacheMgr = (ICompositeCacheManager)CacheManagerFactory.getInstance();
if ( debugcmd ) {
p( "cacheMgr = " + cacheMgr );
}
} else {
if ( debugcmd ) {
p( "already got cacheMgr = " + cacheMgr );
}
}
}
//////////////////////////////////////////////////
public static void p( String s ) {
System.out.println( "LateralCacheTCPListener: " + s );
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/socket/tcp/LateralGroupCacheTCPListener.java
Index: LateralGroupCacheTCPListener.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.socket.tcp;
import java.io.Serializable;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.engine.group.*;
import org.apache.stratum.jcs.engine.control.*;
/**
* Title:
* Description:
* Copyright: Copyright (c) 2001
* Company:
* @author
* @version 1.0
*/
public class LateralGroupCacheTCPListener extends LateralCacheTCPListener implements
ILateralCacheListener {
///////////////////////////////////////////
protected LateralGroupCacheTCPListener (ILateralCacheAttributes ilca) {
super(ilca);
p( "creating LateralGroupCacheTCPListener" );
}
///////////////////////////////////////////////////////////////////////////
public static ILateralCacheListener getInstance( ILateralCacheAttributes ilca ) {
//throws IOException, NotBoundException
ILateralCacheListener ins = (ILateralCacheListener)instances.get(
String.valueOf(ilca.getTcpListenerPort()) );
if (ins == null) {
synchronized(LateralGroupCacheTCPListener.class) {
if (ins == null) {
ins = new LateralGroupCacheTCPListener( ilca );
ins.init();
}
if( debug ) {
p( "created new listener " + ilca.getTcpListenerPort() );
}
instances.put( String.valueOf(ilca.getTcpListenerPort()), ins );
}
}
return ins;
}
////////////////////////////////////////////////
// override for new funcitonality
// lazy init is too slow, find a better way
protected void getCacheManager() {
try {
if ( cacheMgr == null ) {
cacheMgr = (ICompositeCacheManager)GroupCacheManagerFactory.getInstance();
if ( debugcmd ) {
p( " groupcache cacheMgr = " + cacheMgr );
}
} else {
if ( debugcmd ) {
p( "already got groupcache cacheMgr = " + cacheMgr );
}
}
} catch( Exception e ) {
log.error( e );
}
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/socket/tcp/LateralTCPReceiver.java
Index: LateralTCPReceiver.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.socket.tcp;
import java.net.Socket;
import java.net.ServerSocket;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.io.IOException;
import java.io.File;
import java.util.Hashtable;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
///////////////////////////////////////////////////////
public class LateralTCPReceiver implements Runnable {
private static final boolean debug = false;// true;
private int port;
private static final Logger log = LoggerManager.getLogger(
"lateral_lateralcachemanager" );
private ILateralCacheListener ilcl;
////////////////////////////////////////////////
public void run() {
try {
if ( debug ) {
p("Listening on port " + port);
}
log.info("Listening on port " + port);
ServerSocket serverSocket = new ServerSocket(port);
while(true) {
if ( debug ) {
p("Waiting for clients to connect ");
}
log.info("Waiting for clients to client ");
Socket socket = serverSocket.accept();
InetAddress inetAddress = socket.getInetAddress();
if ( debug ) {
p("Connected to client at " + inetAddress);
}
log.info("Connected to client at " + inetAddress);
log.info("Starting new socket node.");
new Thread( new LateralTCPReceiverConnection(socket, ilcl) ).start();
}
}
catch(Exception e) {
e.printStackTrace();
}
}
////////////////////////////////////////////////////////////////
public LateralTCPReceiver( ILateralCacheAttributes lca, ILateralCacheListener ilcl
) {
this.port = lca.getTcpListenerPort();
this.ilcl = ilcl;
if ( debug ) {
p( "ilcl = " + ilcl );
}
}
///////////////////////////////////////////////
private static void p( String s ) {
System.out.println( "LateralTCPReceiver: " + s );
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/socket/tcp/LateralTCPReceiverConnection.java
Index: LateralTCPReceiverConnection.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.socket.tcp;
import java.net.InetAddress;
import java.net.Socket;
import java.net.ServerSocket;
import java.io.InputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
/////////////////////////////////////////////////////
public class LateralTCPReceiverConnection implements Runnable {
protected static final boolean debug = false;
protected static final boolean debugput = true;
private Socket socket;
private ObjectInputStream ois;
private ILateralCacheListener ilcl;
private static final Logger log = LoggerManager.getLogger(
"lateral_lateralcachemanager" );
private int puts = 0;
//////////////////////////////////////////////////////////////////
public LateralTCPReceiverConnection(Socket socket, ILateralCacheListener ilcl) {
this.ilcl = ilcl;
this.socket = socket;
try {
ois = new ObjectInputStream(socket.getInputStream());
}
catch(Exception e) {
log.error(e, "Could not open ObjectInputStream to "+socket);
}
}
///////////////////////////////////////
public void run() {
Object obj;
try {
while(true) {
obj = ois.readObject();
LateralElementDescriptor led = (LateralElementDescriptor)obj;
if ( led == null ) {
p( "led is null" );
continue;
}
if ( led.requesterId == LateralCacheInfo.listenerId ) {
if ( debug ) {
p( "from self" );
}
} else {
log.debug( "receiving led from another" );
if ( debug ) {
p( "from another" );
p( "led = " + led );
p( "led.command = " + led.command );
p( "led.ce = " + led.ce );
p( "ilcl = " + ilcl );
}
if ( led.command == led.UPDATE ) {
puts++;
if ( debugput) {
if ( puts % 100 == 0 ) {
p( "puts = " + puts );
}
}
ilcl.handlePut( led.ce );
} else
if ( led.command == led.REMOVE ) {
ilcl.handleRemove( led.ce.getCacheName(), led.ce.getKey() );
}
}
}
}
catch(java.io.EOFException e) {
log.info("Caught java.io.EOFException closing conneciton.");
}
catch(java.net.SocketException e) {
log.info("Caught java.net.SocketException closing conneciton.");
}
catch(Exception e) {
log.error(e,"Unexpected exception. Closing conneciton.");
}
try {
ois.close();
}
catch(Exception e) {
log.error(e,"Could not close connection.");
}
}
///////////////////////////////////////////////
private static void p( String s ) {
System.out.println( "LateralTCPReceiver: " + s );
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/socket/tcp/LateralTCPSender.java
Index: LateralTCPSender.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.socket.tcp;
import java.io.*;
import java.net.InetAddress;
import java.net.Socket;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
/**
* This class is based on the log4j SocketAppender class.
* I'm using a differnet repair structure, so it is significant;y different.
*/
public class LateralTCPSender {
private static final boolean debug = false;// true;
private static Logger log = LoggerManager.getLogger( LateralTCPSender.class );
private ILateralCacheAttributes ilca;
private String remoteHost;
private InetAddress address;
int port = 1111;
private ObjectOutputStream oos;
int counter = 0;
// reset the ObjectOutputStream every 70 calls
//private static final int RESET_FREQUENCY = 70;
private static final int RESET_FREQUENCY = 70;
/////////////////////////////////////////////////////////////////
public LateralTCPSender( ILateralCacheAttributes lca ) throws IOException {
String p1 = lca.getTcpServer();
String h2 = p1.substring( 0, p1.indexOf(":") );
int po = Integer.parseInt( p1.substring( p1.indexOf(":") + 1 ) );
p( "h2 = " + h2 );
init( h2, po );
this.ilca = lca;
}
/////////////////////////////////////////////////////////
protected void init(String host, int port) throws IOException {
this.port = port;
this.address = getAddressByName(host);
this.remoteHost = host;
try {
Socket socket;
log.debug("Attempting connection to "+address.getHostName());
socket = new Socket(address, port);
synchronized(this) {
oos = new ObjectOutputStream(socket.getOutputStream());
}
}
catch(java.net.ConnectException e) {
log.debug("Remote host "+address.getHostName()+" refused connection.");
throw e;
}
catch(IOException e) {
log.debug("Could not connect to " + address.getHostName()+
". Exception is " + e);
throw e;
}
} // end constructor
/////////////////////////////////////////////////////////
private InetAddress getAddressByName(String host) {
try {
return InetAddress.getByName(host);
}
catch(Exception e) {
log.error(e,"Could not find address of ["+host+"].");
return null;
}
}
///////////////////////////////////////////////////////
public void send(LateralElementDescriptor led) throws IOException{
if ( debug ) {
p( "sending led" );
}
if(led == null) {
return;
}
if(address==null) {
throw new IOException( "No remote host is set for LateralTCPSender." );
//return;
}
if(oos != null) {
try {
oos.writeObject(led);
oos.flush();
if(++counter >= RESET_FREQUENCY) {
counter = 0;
// Failing to reset the object output stream every now and
// then creates a serious memory leak.
log.info("Doing oos.reset()");
oos.reset();
}
}
catch(IOException e) {
oos = null;
log.error("Detected problem with connection: "+e);
throw e;
}
}
}
///////////////////////////////////////////////
private static void p( String s ) {
System.out.println( "LateralTCPSender: " + s );
}
// Service Methods //
////////////////////////////////////////////////////
public void update(ICacheElement item, byte requesterId) throws IOException {
LateralElementDescriptor led = new LateralElementDescriptor( item );
led.requesterId = requesterId;
led.command = led.UPDATE;
send( led );
}
////////////////////////////////////////////////////////////
public void remove(String cacheName, Serializable key) throws IOException {
remove( cacheName, key, LateralCacheInfo.listenerId );
}
public void remove(String cacheName, Serializable key, byte requesterId) throws
IOException {
CacheElement ce = new CacheElement( cacheName, key, null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = led.REMOVE;
send( led );
}
/////////////////////////////////////////////////////
public void release() throws IOException {
// nothing needs to be done
}
public void dispose( String cache ) throws IOException {
// nothing needs to be done
}
/////////////////////////////////////////////////////////////
public void removeAll(String cacheName) throws IOException {
removeAll( cacheName, LateralCacheInfo.listenerId );
}
public void removeAll(String cacheName, byte requesterId) throws IOException {
CacheElement ce = new CacheElement( cacheName, "ALL", null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = led.REMOVEALL;
send( led );
}
////////////////////////////////////////////////
public static void main( String args[] ) {
try {
LateralTCPSender lur = null; //new LateralTCPSender( "localhost", 1111 );
// process user input till done
boolean notDone = true;
String message = null;
// wait to dispose
BufferedReader br = new BufferedReader( new InputStreamReader( System.in ) );
while ( notDone ) {
p( "enter mesage:" );
message= br.readLine();
CacheElement ce = new CacheElement( "test", "test", message );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
lur.send( led );
}
} catch( Exception e ) {
p( e.toString() );
}
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/socket/tcp/LateralTCPService.java
Index: LateralTCPService.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.socket.tcp;
/**
* Title:
* Description:
* Copyright: Copyright (c) 2001
* Company:
* @author
* @version 1.0
*/
import java.io.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
public class LateralTCPService implements ILateralCacheService,
ILateralCacheObserver {
private static boolean debug = false;
private static Logger log = LoggerManager.getLogger( LateralTCPSender.class );
private ILateralCacheAttributes ilca;
private LateralTCPSender sender;
/////////////////////////////////////////////////////////////////
public LateralTCPService( ILateralCacheAttributes lca ) throws IOException {
this.ilca = lca;
try {
if ( debug ) {
p("creating sendered" );
}
sender = new LateralTCPSender( lca );
if ( debug ) {
p("create sendered" );
}
}
catch (IOException e) {
log.error( e );
p("Could not create sender" );
throw e;
}
}
///////////////////////////////////////////////
private static void p( String s ) {
System.out.println( "LateralTCPService: " + s );
}
// Service Methods //
////////////////////////////////////////////////////
public void update(ICacheElement item ) throws IOException {
update( item, LateralCacheInfo.listenerId );
}
public void update(ICacheElement item, byte requesterId) throws IOException {
LateralElementDescriptor led = new LateralElementDescriptor( item );
led.requesterId = requesterId;
led.command = led.UPDATE;
sender.send( led );
}
////////////////////////////////////////////////////////////
public void remove(String cacheName, Serializable key) throws IOException {
remove( cacheName, key, LateralCacheInfo.listenerId );
}
public void remove(String cacheName, Serializable key, byte requesterId) throws
IOException {
CacheElement ce = new CacheElement( cacheName, key, null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = led.REMOVE;
sender.send( led );
}
/////////////////////////////////////////////////////
public void release() throws IOException {
// nothing needs to be done
}
public void dispose( String cache ) throws IOException {
//sender = null;
// nothing needs to be done
}
///////////////////////////////////////////////////////////////
public Serializable get( String cache ) throws IOException {
return null;
// nothing needs to be done
}
///////////////////////////////////////////////////////////////
public Serializable get( String cache, Serializable att ) throws IOException {
return null;
// nothing needs to be done
}
///////////////////////////////////////////////////////////////
public Serializable get( String cache, Serializable att, boolean container )
throws IOException {
return null;
// nothing needs to be done
}
/////////////////////////////////////////////////////////////
public void removeAll(String cacheName) throws IOException {
removeAll( cacheName, LateralCacheInfo.listenerId );
}
public void removeAll(String cacheName, byte requesterId) throws IOException {
CacheElement ce = new CacheElement( cacheName, "ALL", null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = led.REMOVEALL;
sender.send( led );
}
////////////////////////////////////////////////
public static void main( String args[] ) {
try {
LateralTCPSender sender = new LateralTCPSender( new LateralCacheAttributes() );
// process user input till done
boolean notDone = true;
String message = null;
// wait to dispose
BufferedReader br = new BufferedReader( new InputStreamReader( System.in ) );
while ( notDone ) {
p( "enter mesage:" );
message= br.readLine();
CacheElement ce = new CacheElement( "test", "test", message );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
sender.send( led );
}
} catch( Exception e ) {
p( e.toString() );
}
}
// ILateralCacheObserver methods, do nothing here since
// the connection is not registered, the udp service is
// is not registered.
public void addCacheListener(String cacheName, ICacheListener obj) throws
IOException {}
public void addCacheListener(ICacheListener obj) throws IOException {}
public void removeCacheListener(String cacheName, ICacheListener obj) throws
IOException {}
public void removeCacheListener(ICacheListener obj) throws IOException {}
} // end class
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>