jtaylor 2002/08/10 16:52:45
Modified: auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups
JavaGroupsCache.java JavaGroupsCacheAttributes.java
JavaGroupsCacheFactory.java
auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups
JavaGroupsCacheTest.ccf JavaGroupsCacheTest.java
Added: auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups
JavaGroupsCacheWithGetTest.ccf
JavaGroupsCacheWithGetTest.java
Log:
Implemented get (optional, disabled by default) for javagroups. Reimplemented
other methods using MessageDispatcher instead of a listener thread.
Revision Changes Path
1.2 +97 -106
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCache.java
Index: JavaGroupsCache.java
===================================================================
RCS file:
/home/cvs/jakarta-turbine-jcs/auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCache.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- JavaGroupsCache.java 7 Aug 2002 15:35:16 -0000 1.1
+++ JavaGroupsCache.java 10 Aug 2002 23:52:45 -0000 1.2
@@ -64,11 +64,18 @@
import org.apache.jcs.engine.control.CompositeCache;
import org.javagroups.Channel;
import org.javagroups.Message;
+import org.javagroups.View;
+import org.javagroups.Address;
+import org.javagroups.MembershipListener;
+import org.javagroups.util.RspList;
import org.javagroups.blocks.RequestHandler;
+import org.javagroups.blocks.GroupRequest;
+import org.javagroups.blocks.MessageDispatcher;
import java.io.IOException;
import java.io.Serializable;
import java.util.Set;
+import java.util.Vector;
/**
* Auxiliary cache using javagroups. Expects to be created with a Channel,
@@ -92,20 +99,27 @@
* @author <a href="[EMAIL PROTECTED]">James Taylor</a>
* @version $Id$
*/
-public class JavaGroupsCache implements AuxiliaryCache, RequestHandler
+public class JavaGroupsCache
+ implements AuxiliaryCache, RequestHandler, MembershipListener
{
- private final static Log log = LogFactory.getLog( JavaGroupsCache.class );
+ private static int ct = 0;
+
+ private final Log log = LogFactory.getLog( JavaGroupsCache.class.getName() +
(ct++) );
private String cacheName;
private int status;
+ private boolean getFromPeers;
+
private CompositeCache cache;
private Channel channel;
- private Listener listener;
+ private MessageDispatcher dispatcher;
- public JavaGroupsCache( CompositeCache cache, Channel channel )
+ public JavaGroupsCache( CompositeCache cache,
+ Channel channel,
+ boolean getFromPeers )
throws Exception
{
this.cache = cache;
@@ -113,16 +127,18 @@
this.cacheName = cache.getCacheName();
this.channel = channel;
- // Connect channel to the 'group' for our region name
-
- channel.connect( cacheName );
+ this.getFromPeers = getFromPeers;
// The adapter listens to the channel and fires MessageListener events
// on this object.
- listener = new Listener( channel, this, cacheName );
+ dispatcher = new MessageDispatcher( channel, null, this, this );
- listener.start();
+ // Connect channel to the 'group' for our region name
+
+ channel.setOpt( Channel.LOCAL, Boolean.FALSE );
+
+ channel.connect( cacheName );
// If all the above succeed, the cache is now alive.
@@ -137,7 +153,14 @@
try
{
- channel.send( new Message( null, null, request ) );
+ log.info( "Sending" );
+
+ dispatcher.castMessage( null,
+ new Message( null, null, request ),
+ GroupRequest.GET_NONE,
+ 0 );
+ log.info( "Sent" );
+
}
catch ( Exception e )
{
@@ -160,19 +183,44 @@
}
/**
- * Not implemented, we expect that if an element is available to one of our
- * peers, it has already been shared by a previous update.
+ * If 'getFromPeers' is true, this will attempt to get the requested
+ * element from ant other members of the group.
*
- * NOTE: This could be easily implemented using a MessageDispatcher and
- * ResponseCollector, which would be usefull when the caches get out of
- * sync (for example, if one member is rebooted).
- *
- * @param key Ignored
- * @return Always returns null
+ * @param key
+ * @return
* @throws IOException Never thrown by this implementation
*/
public ICacheElement get( Serializable key ) throws IOException
{
+ if ( getFromPeers )
+ {
+ CacheElement element = new CacheElement( cacheName, key, null );
+
+ Request request = new Request( element, Request.GET );
+
+ // Cast message and wait for all responses.
+
+ // FIXME: we can stop waiting after the first not null response,
+ // that is more difficult to implement however.
+
+ RspList responses =
+ dispatcher.castMessage( null,
+ new Message( null, null, request ),
+ GroupRequest.GET_ALL,
+ 0 );
+
+ // Get results only gives the responses which were not null
+
+ Vector results = responses.getResults();
+
+ // If there were any non null results, return the first
+
+ if ( results.size() > 0 )
+ {
+ return ( ICacheElement ) results.get( 0 );
+ }
+ }
+
return null;
}
@@ -212,9 +260,18 @@
*/
public void dispose() throws IOException
{
- listener.terminate();
+ // This will join the scheduler thread and ensure everything terminates
+
+ dispatcher.stop();
+
+ // Now we can disconnect from the group and close the channel
+
+ channel.disconnect();
+ channel.close();
status = CacheConstants.STATUS_DISPOSED;
+
+ log.info( "Disposed for cache: " + cacheName );
}
/**
@@ -285,6 +342,8 @@
{
try
{
+ log.info( "Handling" );
+
Request request = ( Request ) msg.getObject();
// Switch based on the command and invoke the
@@ -292,6 +351,11 @@
switch ( request.getCommand() )
{
+ case Request.GET:
+
+ return cache.localGet( request.getCacheElement().getKey() );
+ // break;
+
case Request.UPDATE:
cache.localUpdate( request.getCacheElement() );
@@ -312,6 +376,7 @@
log.error( "Recieved unknown command" );
}
+ log.info( "Handled" );
}
catch ( Exception e )
{
@@ -321,6 +386,17 @@
return null;
}
+ // ------------------------------------------- interface MembershipListener
+
+ public void viewAccepted( View view )
+ {
+ log.info( "View Changed: " + String.valueOf( view ) );
+ }
+
+ public void suspect( Address suspectedAddress ) { }
+
+ public void block() { }
+
// ---------------------------------------------------------- inner classes
/**
@@ -332,6 +408,7 @@
public final static int UPDATE = 1;
public final static int REMOVE = 2;
public final static int REMOVE_ALL = 3;
+ public final static int GET = 5;
private ICacheElement cacheElement;
private int command;
@@ -350,92 +427,6 @@
public int getCommand()
{
return command;
- }
- }
-
- /**
- * Listens for messages on the provided channel and dispatches them to the
- * CompositeCache that this auxiliary was created with. Also, when
- * terminated is resposible for disconnecting and closing the Channel.
- */
- static class Listener extends Thread
- {
- boolean running = true;
-
- RequestHandler handler;
-
- Channel channel;
- Object object;
-
- public Listener( Channel channel,
- RequestHandler handler,
- String cacheName )
- {
- super( "JavaGroupsCache.Listener: " + cacheName );
-
- this.channel = channel;
- this.handler = handler;
- }
-
- public void run()
- {
- while ( running )
- {
- // Get next object from channel, blocking indefinately
-
- try
- {
- object = channel.receive( 0 );
- }
- catch ( Exception e )
- {
- // If we are still supposed to be running, pause and
- // continue.
-
- if ( running )
- {
- try
- {
- Thread.sleep( 1000 );
- }
- catch ( Exception ignored )
- {
- // Ignored
- }
- }
- }
-
-
- // Ignore anything that is not a message
-
- if ( object instanceof Message )
- {
- handler.handle( ( Message ) object );
- }
- }
-
- // When terminated, clean up the channel. Termination should be
- // initiated by the dispose() method of the enclosing
- // JavaGroupsCache
-
- channel.disconnect();
- channel.close();
- }
-
- public void terminate()
- {
- running = false;
-
- this.interrupt();
-
- try
- {
- this.join( 1000 );
- }
- catch ( InterruptedException e )
- {
- // Ignored, not much we can do anymore.
- }
}
}
}
1.2 +12 -1
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheAttributes.java
Index: JavaGroupsCacheAttributes.java
===================================================================
RCS file:
/home/cvs/jakarta-turbine-jcs/auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheAttributes.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- JavaGroupsCacheAttributes.java 7 Aug 2002 15:35:16 -0000 1.1
+++ JavaGroupsCacheAttributes.java 10 Aug 2002 23:52:45 -0000 1.2
@@ -88,6 +88,7 @@
private String channelFactoryClassName = "org.javagroups.JChannelFactory";
private String channelProperties = null;
+ private boolean getFromPeers = false;
public String getChannelFactoryClassName()
{
@@ -107,6 +108,16 @@
public void setChannelProperties( String channelProperties )
{
this.channelProperties = channelProperties;
+ }
+
+ public boolean isGetFromPeers()
+ {
+ return getFromPeers;
+ }
+
+ public void setGetFromPeers( boolean getFromPeers )
+ {
+ this.getFromPeers = getFromPeers;
}
// ----------------------------------------------- AuxiliaryCacheAttributes
1.2 +4 -2
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheFactory.java
Index: JavaGroupsCacheFactory.java
===================================================================
RCS file:
/home/cvs/jakarta-turbine-jcs/auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheFactory.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- JavaGroupsCacheFactory.java 7 Aug 2002 15:35:16 -0000 1.1
+++ JavaGroupsCacheFactory.java 10 Aug 2002 23:52:45 -0000 1.2
@@ -101,7 +101,9 @@
// Return a new JavaGroupsCache for the new channel.
- return new JavaGroupsCache( cache, channel );
+ return new JavaGroupsCache( cache,
+ channel,
+ attributes.isGetFromPeers() );
}
catch ( Exception e )
{
1.2 +1 -1
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheTest.ccf
Index: JavaGroupsCacheTest.ccf
===================================================================
RCS file:
/home/cvs/jakarta-turbine-jcs/auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheTest.ccf,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- JavaGroupsCacheTest.ccf 7 Aug 2002 15:35:16 -0000 1.1
+++ JavaGroupsCacheTest.ccf 10 Aug 2002 23:52:45 -0000 1.2
@@ -9,4 +9,4 @@
jcs.auxiliary.JG = org.apache.jcs.auxiliary.javagroups.JavaGroupsCacheFactory
jcs.auxiliary.JG.attributes =
org.apache.jcs.auxiliary.javagroups.JavaGroupsCacheAttributes
jcs.auxiliary.JG.attributes.ChannelFactoryClassName = org.javagroups.JChannelFactory
-jcs.auxiliary.JG.attributes.ChannelProperties =
UDP(mcast_addr=224.0.0.100;mcast_port=7501):PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE
+jcs.auxiliary.JG.attributes.ChannelProperties =
UDP(mcast_addr=224.0.0.100;mcast_port=7501):PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:QUEUE
\ No newline at end of file
1.3 +3 -3
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheTest.java
Index: JavaGroupsCacheTest.java
===================================================================
RCS file:
/home/cvs/jakarta-turbine-jcs/auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheTest.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- JavaGroupsCacheTest.java 7 Aug 2002 15:46:34 -0000 1.2
+++ JavaGroupsCacheTest.java 10 Aug 2002 23:52:45 -0000 1.3
@@ -45,7 +45,7 @@
// Wait for it to propogate -- FIXME: This is time sensitive and thus
// a bad idea for a unit test.
- Thread.sleep( 1000 );
+ Thread.sleep( 500 );
// Assert that the values were correctly propogated
@@ -59,7 +59,7 @@
one.remove( "2" );
- Thread.sleep( 1000 );
+ Thread.sleep( 500 );
assertEquals( null, two.get( "2") );
@@ -67,7 +67,7 @@
one.remove();
- Thread.sleep( 1000 );
+ Thread.sleep( 500 );
assertEquals( null, two.get( "1" ) );
assertEquals( null, two.get( "2" ) );
1.1
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheWithGetTest.ccf
Index: JavaGroupsCacheWithGetTest.ccf
===================================================================
jcs.default = JG
jcs.default.elementattributes = org.apache.jcs.engine.ElementAttributes
jcs.default.elementattributes.IsEternal = true
jcs.default.elementattributes.MaxLifeSeconds = 60
jcs.default.elementattributes.IsSpool = true
jcs.default.elementattributes.IsRemote = true
jcs.default.elementattributes.IsLateral = true
jcs.auxiliary.JG = org.apache.jcs.auxiliary.javagroups.JavaGroupsCacheFactory
jcs.auxiliary.JG.attributes =
org.apache.jcs.auxiliary.javagroups.JavaGroupsCacheAttributes
jcs.auxiliary.JG.attributes.ChannelFactoryClassName = org.javagroups.JChannelFactory
jcs.auxiliary.JG.attributes.ChannelProperties =
UDP(mcast_addr=224.0.0.100;mcast_port=7501):PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:QUEUE
jcs.auxiliary.JG.attributes.GetFromPeers = true
1.1
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheWithGetTest.java
Index: JavaGroupsCacheWithGetTest.java
===================================================================
package org.apache.jcs.auxiliary.javagroups;
import org.apache.jcs.engine.control.CompositeCacheManager;
import org.apache.jcs.access.CacheAccess;
import org.javagroups.log.Tracer;
import org.javagroups.log.Trace;
import java.util.Properties;
import java.io.IOException;
import junit.framework.TestCase;
public class JavaGroupsCacheWithGetTest extends TestCase
{
public JavaGroupsCacheWithGetTest( String testName )
{
super( testName );
}
public void testWithGet() throws Exception
{
// Create and configure first manager and region
CompositeCacheManager manager1 = new CompositeCacheManager();
manager1.configure( getProperties() );
CacheAccess one = new CacheAccess( manager1.getCache( "testCache" ) );
one.put( "1", "one" );
one.put( "2", "two" );
one.put( "3", "three" );
one.put( "4", "four" );
one.put( "5", "five" );
// Now get second manager and region, it will join the group
CompositeCacheManager manager2 = new CompositeCacheManager();
manager2.configure( getProperties() );
CacheAccess two = new CacheAccess( manager2.getCache( "testCache" ) );
assertEquals( "one", two.get( "1" ) );
assertEquals( "two", two.get( "2" ) );
assertEquals( "three", two.get( "3" ) );
assertEquals( "four", two.get( "4" ) );
assertEquals( "five", two.get( "5" ) );
// Free caches
manager1.freeCache( "testCache" );
manager2.freeCache( "testCache" );
}
private Properties getProperties() throws IOException
{
Properties props = new Properties();
props.load( getClass().getResourceAsStream(
"/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheWithGetTest.ccf" )
);
return props;
}
}
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>