asmuts 02/01/14 22:28:18
Added: src/java/org/apache/stratum/jcs/auxiliary/lateral/socket/udp
LateralCacheUDPListener.java
LateralGroupCacheUDPListener.java
LateralUDPReceiver.java LateralUDPSender.java
LateralUDPService.java
Log:
experimental
very ureliable
could be used for some low volume or last minute emergency remove requests
Revision Changes Path
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/socket/udp/LateralCacheUDPListener.java
Index: LateralCacheUDPListener.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.socket.udp;
//////////////////////////////////
import java.io.*;
import java.rmi.*;
import java.rmi.registry.*;
import java.rmi.server.*;
import java.util.*;
import java.sql.*;
/////////////////////////////////////
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.engine.control.*;
import org.apache.stratum.jcs.engine.group.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.*;
import org.apache.stratum.jcs.auxiliary.disk.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
// remove
import org.apache.stratum.jcs.utils.log.*;
///////////////////////////////////////////////////////////////////////
public class LateralCacheUDPListener implements ILateralCacheListener, Serializable {
protected static final boolean debug = false;//true;
protected static final boolean debugcmd = false;//true;
protected static transient Logger log;
protected static transient ICompositeCacheManager cacheMgr;
protected static final HashMap instances = new HashMap();
// instance vars
private LateralUDPReceiver receiver;
private ILateralCacheAttributes ilca;
private boolean inited = false;
/////////////////////////////////////////////////////////////
/**
* Only need one since it does work for all regions,
* just reference by multiple region names.
*/
protected LateralCacheUDPListener ( ILateralCacheAttributes ilca ) {
log = LoggerManager.getLogger( "lateral_lateralcachemanager" );
}
///////////////////////////////////////////////////
public void init() {
try {
// need to connect based on type
receiver = new LateralUDPReceiver( ilca, this );
Thread t = new Thread( receiver );
t.start();
} catch (Exception ex) {
log.error(ex);
throw new IllegalStateException(ex.getMessage());
}
}
/** let the lateral cache set a listener_id.
* Since there is only one listerenr for all the regions
* and every region gets registered? the id shouldn't be set
* if it isn't zero. If it is we assume that it is a reconnect.
*/
public void setListenerId( byte id ) throws IOException {
LateralCacheInfo.listenerId = id;
if ( debugcmd ) {
p( "set listenerId = " + id );
}
}
public byte getListenerId( ) throws IOException {
// set the manager since we are in use
//getCacheManager();
//p( "get listenerId" );
if ( debugcmd ) {
p( "get listenerId = " + LateralCacheInfo.listenerId );
}
return LateralCacheInfo.listenerId;
}
///////////////////////////////////////////////////////////////////////////
public static ILateralCacheListener getInstance( ILateralCacheAttributes ilca ) {
//throws IOException, NotBoundException
ILateralCacheListener ins = (ILateralCacheListener)instances.get(
ilca.getUdpMulticastAddr() + ":" + ilca.getUdpMulticastPort() );
if (ins == null) {
synchronized(LateralCacheUDPListener.class) {
if (ins == null) {
ins = new LateralCacheUDPListener( ilca );
ins.init();
}
if( debug ) {
p( "created new listener " + ilca.getUdpMulticastAddr() + ":" +
ilca.getUdpMulticastPort() );
}
instances.put( ilca.getUdpMulticastAddr() + ":" +
ilca.getUdpMulticastPort(), ins );
}
}
return ins;
}
/////////////////////////////////////////////////////////////////////////////
//////////////////////////// implements the ILateralCacheListener interface.
//////////////
/**
*/
public void handlePut (ICacheElement cb) throws IOException {
if ( debugcmd ) {
p( "PUTTING ELEMENT FROM LATERAL" );
}
getCacheManager();
ICompositeCache cache = (ICompositeCache)cacheMgr.getCache(cb.getCacheName());
cache.update(cb, false);
//handleRemove(cb.getCacheName(), cb.getKey());
}
////////////////////////////////////////////////////
public void handleRemove (String cacheName, Serializable key) throws IOException {
if (debug) {
log.debug("handleRemove> cacheName=" + cacheName + ", key=" + key);
}
if ( debugcmd ) {
p("handleRemove> cacheName=" + cacheName + ", key=" + key);
}
getCacheManager();
// interface limitation here
ICompositeCache cache = (ICompositeCache)cacheMgr.getCache(cacheName);
cache.remove(key, cache.REMOTE_INVOKATION);
}
//////////////////////////////////////////////////
public void handleRemoveAll (String cacheName) throws IOException {
if (debug) {
log.debug("handleRemoveAll> cacheName=" + cacheName);
}
getCacheManager();
ICache cache = cacheMgr.getCache(cacheName);
cache.removeAll();
}
///////////////////////////////////////////////////
public void handleDispose (String cacheName) throws IOException {
if (debug) {
log.debug("handleDispose> cacheName=" + cacheName);
}
CompositeCacheManager cm = (CompositeCacheManager)cacheMgr;
cm.freeCache(cacheName, Cache.REMOTE_INVOKATION);
}
////////////////////////////////////////////////
// override for new funcitonality
protected void getCacheManager() {
if ( cacheMgr == null ) {
cacheMgr = (ICompositeCacheManager)CacheManagerFactory.getInstance();
p( "had to get cacheMgr" );
if ( debugcmd ) {
p( "cacheMgr = " + cacheMgr );
}
} else {
if ( debugcmd ) {
p( "already got cacheMgr = " + cacheMgr );
}
}
}
//////////////////////////////////////////////////
public static void p( String s ) {
System.out.println( "LateralCacheListener: " + s );
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/socket/udp/LateralGroupCacheUDPListener.java
Index: LateralGroupCacheUDPListener.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.socket.udp;
//////////////////////////////////
import java.io.*;
import java.rmi.*;
import java.rmi.registry.*;
import java.rmi.server.*;
import java.util.*;
import java.sql.*;
/////////////////////////////////////
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.engine.control.*;
import org.apache.stratum.jcs.engine.group.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.access.*;
import org.apache.stratum.jcs.engine.control.Cache;
import org.apache.stratum.jcs.auxiliary.disk.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
///////////////////////////////////////////////////////////////////////
public class LateralGroupCacheUDPListener extends LateralCacheUDPListener implements
ILateralCacheListener, Serializable {
///////////////////////////////////////////
protected LateralGroupCacheUDPListener (ILateralCacheAttributes ilca) {
super(ilca);
p( "creating LateralGroupCacheUDPListener" );
}
///////////////////////////////////////////////////////////////////////////
public static ILateralCacheListener getInstance( ILateralCacheAttributes ilca ) {
//throws IOException, NotBoundException
ILateralCacheListener ins = (ILateralCacheListener)instances.get(
ilca.getUdpMulticastAddr() + ":" + ilca.getUdpMulticastPort() );
if (ins == null) {
synchronized(LateralGroupCacheUDPListener.class) {
if (ins == null) {
ins = new LateralGroupCacheUDPListener( ilca );
}
if( debug ) {
p( "created new listener " + ilca.getUdpMulticastAddr() + ":" +
ilca.getUdpMulticastPort() );
}
instances.put( ilca.getUdpMulticastAddr() + ":" + ilca.getUdpMulticastPort(),
ins );
}
}
return ins;
}
// just need to put new logic for remove( key, int ) into groupcache
// or have the existing double arg method call the single arg method which is
// overridden in the group cache.
///////////////////////////////////////////////////
/* not necessary */
public void handlePut (ICacheElement cb) throws IOException {
if ( debugcmd ) {
p( "PUTTING ELEMENT FROM REMOTE" );
}
// could put this in the group cache.
if (cb.getKey() instanceof GroupAttrName) {
try {
if ( debug ) {
p( "putting gi for ga method" );
}
// need to lean up the group putting
/*
GroupCache cache = (GroupCache)cacheMgr.getCache(cb.getCacheName());
GroupAttrName gan = (GroupAttrName)cb.getKey();
GroupId groupId = new GroupId( gan.groupId );
cache.putGAN( gan, cb.getVal(), cb.getAttributes(), false);
*/
ICompositeCache cache =
(ICompositeCache)cacheMgr.getCache(cb.getCacheName());
cache.update(cb, false);
} catch ( Exception ioe ) {}
return;
}
super.handlePut(cb);
}
////////////////////////////////////////////////
// override for new funcitonality
// lazy init is too slow, find a better way
protected void getCacheManager() {
try {
if ( cacheMgr == null ) {
cacheMgr = (ICompositeCacheManager)GroupCacheManagerFactory.getInstance();
if ( debugcmd ) {
p( " groupcache cacheMgr = " + cacheMgr );
}
} else {
if ( debugcmd ) {
p( "already got groupcache cacheMgr = " + cacheMgr );
}
}
} catch( Exception e ) {
log.error( e );
}
}
}
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/socket/udp/LateralUDPReceiver.java
Index: LateralUDPReceiver.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.socket.udp;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
/**
* A highly unreliable UDP receiver. It is easy to outrun. Uncaught message
* will die.
*/
public class LateralUDPReceiver implements Runnable {
private static final boolean debug = true;
private final byte[] m_buffer = new byte[65536];
private MulticastSocket m_socket;
private static Logger log = LoggerManager.getLogger( LateralUDPReceiver.class );
ILateralCacheListener ilcl;
/////////////////////////////////////////////////////////////////
public LateralUDPReceiver( ILateralCacheAttributes lca, ILateralCacheListener ilcl
) throws IOException {
this( lca.getUdpMulticastAddr(), lca.getUdpMulticastPort() );
this.ilcl = ilcl;
}
//////////////////////////////////////////////////////////////
protected LateralUDPReceiver(String multicastAddressString, int multicastPort)
throws IOException {
p( "constructing listener, " + multicastAddressString + ":" + multicastPort );
try {
m_socket = new MulticastSocket(multicastPort);
m_socket.joinGroup(InetAddress.getByName(multicastAddressString));
}
catch (IOException e) {
log.error( e );
p("Could not bind to multicast address " + multicastAddressString + ":" +
multicastPort );
//throw e ;//new CacheException( "Could not bind to multicast address " +
multicastAddressString + ":" + multicastPort, e);
}
}
////////////////////////////////////////////////////
/**
* Highly unreliable. If it is processing one message while another comes in
* , the second message is lost. This is for low concurency peppering.
*/
public Object waitForMessage() throws IOException {
final DatagramPacket packet = new DatagramPacket(m_buffer,
m_buffer.length);
Object obj = null;
try {
m_socket.receive(packet);
final ByteArrayInputStream byteStream = new ByteArrayInputStream(m_buffer,
0, packet.getLength());
final ObjectInputStream objectStream = new ObjectInputStream(byteStream);
obj = objectStream.readObject();
}
catch (Exception e) {
log.error( e );
//throw new CacheException( "Error receving multicast packet", e);
}
return obj;
}
//////////////////////////////////////
public void run() {
try {
while(true) {
Object obj = waitForMessage();
LateralElementDescriptor led = (LateralElementDescriptor)obj;
if ( led.requesterId == LateralCacheInfo.listenerId ) {
if ( debug ) {
p( "from self" );
}
} else {
if ( debug ) {
p( "from another" );
}
if ( led.command == led.UPDATE ) {
ilcl.handlePut( led.ce );
} else
if ( led.command == led.UPDATE ) {
ilcl.handleRemove( led.ce.getCacheName(), led.ce.getKey() );
}
}
}
}catch( Exception e ) {
}
} // end run
///////////////////////////////////////////////
private static void p( String s ) {
System.out.println( "LateralUDPReceiver: " + s );
}
////////////////////////////////////////////////
public static void main( String args[] ) {
try {
LateralUDPReceiver lur = new LateralUDPReceiver( "228.5.6.7", 6789 );
Thread t = new Thread( lur );
t.start();
} catch( Exception e ) {
p( e.toString() );
}
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/socket/udp/LateralUDPSender.java
Index: LateralUDPSender.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.socket.udp;
import java.io.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
/**
*/
public class LateralUDPSender {
private static final boolean debug = true;
private MulticastSocket m_localSocket;
private InetAddress m_multicastAddress;
private int m_multicastPort;
private static Logger log = LoggerManager.getLogger( LateralUDPSender.class );
private ILateralCacheAttributes ilca;
/////////////////////////////////////////////////////////////////
public LateralUDPSender( ILateralCacheAttributes lca ) throws IOException {
if ( debug ) {
p( "contructing lca" );
}
this.ilca = lca;
try {
m_localSocket = new MulticastSocket();
// Remote address.
m_multicastAddress =
InetAddress.getByName(lca.getUdpMulticastAddr());
}
catch (IOException e) {
log.error( e );
p("Could not bind to multicast address " + lca.getUdpMulticastAddr());
throw e; //new CacheException("Could not bind to multicast address " +
multicastAddressString);
}
m_multicastPort = lca.getUdpMulticastPort();
}
/////////////////////////////////////////////////////////
public void send(LateralElementDescriptor led) throws IOException {
if ( debug ) {
p( "sending led" );
}
try {
final MyByteArrayOutputStream byteStream = new MyByteArrayOutputStream();
final ObjectOutputStream objectStream = new
ObjectOutputStream(byteStream);
objectStream.writeObject(led);
objectStream.flush();
final byte[] bytes = byteStream.getBytes();
final DatagramPacket packet = new DatagramPacket(bytes, bytes.length,
m_multicastAddress, m_multicastPort);
m_localSocket.send(packet);
}
catch (IOException e) {
log.error( e );
p("Exception sending message");
throw e; //new CacheException( "Exception sending led", e);
}
}
///////////////////////////////////////////////
private static void p( String s ) {
System.out.println( "LateralUDPSender: " + s );
}
// Service Methods //
////////////////////////////////////////////////////
public void update(ICacheElement item, byte requesterId) throws IOException {
LateralElementDescriptor led = new LateralElementDescriptor( item );
led.requesterId = requesterId;
led.command = led.UPDATE;
send( led );
}
////////////////////////////////////////////////////////////
public void remove(String cacheName, Serializable key) throws IOException {
remove( cacheName, key, LateralCacheInfo.listenerId );
}
public void remove(String cacheName, Serializable key, byte requesterId) throws
IOException {
CacheElement ce = new CacheElement( cacheName, key, null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = led.REMOVE;
send( led );
}
/////////////////////////////////////////////////////
public void release() throws IOException {
// nothing needs to be done
}
public void dispose( String cache ) throws IOException {
// nothing needs to be done
}
/////////////////////////////////////////////////////////////
public void removeAll(String cacheName) throws IOException {
removeAll( cacheName, LateralCacheInfo.listenerId );
}
public void removeAll(String cacheName, byte requesterId) throws IOException {
CacheElement ce = new CacheElement( cacheName, "ALL", null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = led.REMOVEALL;
send( led );
}
////////////////////////////////////////////////
public static void main( String args[] ) {
try {
LateralUDPSender lur = new LateralUDPSender( new LateralCacheAttributes() );
// process user input till done
boolean notDone = true;
String message = null;
// wait to dispose
BufferedReader br = new BufferedReader( new InputStreamReader( System.in ) );
while ( notDone ) {
p( "enter mesage:" );
message= br.readLine();
CacheElement ce = new CacheElement( "test", "test", message );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
lur.send( led );
}
} catch( Exception e ) {
p( e.toString() );
}
}
} // end class
///////////////////////////////////////////////////////////
class MyByteArrayOutputStream extends ByteArrayOutputStream {
public byte[] getBytes() {
return buf;
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/socket/udp/LateralUDPService.java
Index: LateralUDPService.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.socket.udp;
/**
* Title:
* Description:
* Copyright: Copyright (c) 2001
* Company:
* @author
* @version 1.0
*/
import java.io.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.utils.log.*;
import org.apache.stratum.jcs.access.exception.*;
import org.apache.stratum.jcs.auxiliary.lateral.*;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.*;
public class LateralUDPService implements ILateralCacheService,
ILateralCacheObserver {
private static Logger log = LoggerManager.getLogger( LateralUDPSender.class );
private ILateralCacheAttributes ilca;
private LateralUDPSender sender;
/////////////////////////////////////////////////////////////////
public LateralUDPService( ILateralCacheAttributes lca ) throws IOException {
this.ilca = lca;
try {
sender = new LateralUDPSender( lca );
}
catch (IOException e) {
log.error( e );
p("Could not create sender" );
throw e;
}
}
///////////////////////////////////////////////
private static void p( String s ) {
System.out.println( "LateralUDPService: " + s );
}
// Service Methods //
////////////////////////////////////////////////////
public void update(ICacheElement item ) throws IOException {
update( item, LateralCacheInfo.listenerId );
}
public void update(ICacheElement item, byte requesterId) throws IOException {
LateralElementDescriptor led = new LateralElementDescriptor( item );
led.requesterId = requesterId;
led.command = led.UPDATE;
sender.send( led );
}
////////////////////////////////////////////////////////////
public void remove(String cacheName, Serializable key) throws IOException {
remove( cacheName, key, LateralCacheInfo.listenerId );
}
public void remove(String cacheName, Serializable key, byte requesterId) throws
IOException {
CacheElement ce = new CacheElement( cacheName, key, null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = led.REMOVE;
sender.send( led );
}
/////////////////////////////////////////////////////
public void release() throws IOException {
// nothing needs to be done
}
public void dispose( String cache ) throws IOException {
//sender = null;
// nothing needs to be done
}
///////////////////////////////////////////////////////////////
public Serializable get( String cache ) throws IOException {
return null;
// nothing needs to be done
}
///////////////////////////////////////////////////////////////
public Serializable get( String cache, Serializable att ) throws IOException {
return null;
// nothing needs to be done
}
///////////////////////////////////////////////////////////////
public Serializable get( String cache, Serializable att, boolean container )
throws IOException {
return null;
// nothing needs to be done
}
/////////////////////////////////////////////////////////////
public void removeAll(String cacheName) throws IOException {
removeAll( cacheName, LateralCacheInfo.listenerId );
}
public void removeAll(String cacheName, byte requesterId) throws IOException {
CacheElement ce = new CacheElement( cacheName, "ALL", null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = led.REMOVEALL;
sender.send( led );
}
////////////////////////////////////////////////
public static void main( String args[] ) {
try {
LateralUDPSender sender = new LateralUDPSender( new LateralCacheAttributes() );
// process user input till done
boolean notDone = true;
String message = null;
// wait to dispose
BufferedReader br = new BufferedReader( new InputStreamReader( System.in ) );
while ( notDone ) {
p( "enter mesage:" );
message= br.readLine();
CacheElement ce = new CacheElement( "test", "test", message );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
sender.send( led );
}
} catch( Exception e ) {
p( e.toString() );
}
}
// ILateralCacheObserver methods, do nothing here since
// the connection is not registered, the udp service is
// is not registered.
public void addCacheListener(String cacheName, ICacheListener obj) throws
IOException {}
public void addCacheListener(ICacheListener obj) throws IOException {}
public void removeCacheListener(String cacheName, ICacheListener obj) throws
IOException {}
public void removeCacheListener(ICacheListener obj) throws IOException {}
} // end class
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>