jtaylor     2002/08/10 16:52:45

  Modified:    auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups
                        JavaGroupsCache.java JavaGroupsCacheAttributes.java
                        JavaGroupsCacheFactory.java
               auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups
                        JavaGroupsCacheTest.ccf JavaGroupsCacheTest.java
  Added:       auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups
                        JavaGroupsCacheWithGetTest.ccf
                        JavaGroupsCacheWithGetTest.java
  Log:
  Implemented get (optional, disabled by default) for javagroups. Reimplemented
  other methods using MessageDispatcher instead of a listener thread.
  
  Revision  Changes    Path
  1.2       +97 -106   
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCache.java
  
  Index: JavaGroupsCache.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-turbine-jcs/auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCache.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- JavaGroupsCache.java      7 Aug 2002 15:35:16 -0000       1.1
  +++ JavaGroupsCache.java      10 Aug 2002 23:52:45 -0000      1.2
  @@ -64,11 +64,18 @@
   import org.apache.jcs.engine.control.CompositeCache;
   import org.javagroups.Channel;
   import org.javagroups.Message;
  +import org.javagroups.View;
  +import org.javagroups.Address;
  +import org.javagroups.MembershipListener;
  +import org.javagroups.util.RspList;
   import org.javagroups.blocks.RequestHandler;
  +import org.javagroups.blocks.GroupRequest;
  +import org.javagroups.blocks.MessageDispatcher;
   
   import java.io.IOException;
   import java.io.Serializable;
   import java.util.Set;
  +import java.util.Vector;
   
   /**
    * Auxiliary cache using javagroups. Expects to be created with a Channel,
  @@ -92,20 +99,27 @@
    * @author <a href="[EMAIL PROTECTED]">James Taylor</a>
    * @version $Id$
    */
  -public class JavaGroupsCache implements AuxiliaryCache, RequestHandler
  +public class JavaGroupsCache
  +    implements AuxiliaryCache, RequestHandler, MembershipListener
   {
  -    private final static Log log = LogFactory.getLog( JavaGroupsCache.class );
  +    private static int ct = 0;
  +
  +    private final Log log = LogFactory.getLog( JavaGroupsCache.class.getName() + 
(ct++) );
   
       private String cacheName;
       private int status;
   
  +    private boolean getFromPeers;
  +
       private CompositeCache cache;
   
       private Channel channel;
   
  -    private Listener listener;
  +    private MessageDispatcher dispatcher;
   
  -    public JavaGroupsCache( CompositeCache cache, Channel channel )
  +    public JavaGroupsCache( CompositeCache cache,
  +                            Channel channel,
  +                            boolean getFromPeers )
           throws Exception
       {
           this.cache = cache;
  @@ -113,16 +127,18 @@
           this.cacheName = cache.getCacheName();
           this.channel = channel;
   
  -        // Connect channel to the 'group' for our region name
  -
  -        channel.connect( cacheName );
  +        this.getFromPeers = getFromPeers;
   
           // The adapter listens to the channel and fires MessageListener events
           // on this object.
   
  -        listener = new Listener( channel, this, cacheName );
  +        dispatcher = new MessageDispatcher( channel, null, this, this );
   
  -        listener.start();
  +        // Connect channel to the 'group' for our region name
  +
  +        channel.setOpt( Channel.LOCAL, Boolean.FALSE );
  +
  +        channel.connect( cacheName );
   
           // If all the above succeed, the cache is now alive.
   
  @@ -137,7 +153,14 @@
   
           try
           {
  -            channel.send( new Message( null, null, request ) );
  +            log.info( "Sending" );
  +
  +            dispatcher.castMessage( null,
  +                                    new Message( null, null, request ),
  +                                    GroupRequest.GET_NONE,
  +                                    0 );
  +            log.info( "Sent" );
  +
           }
           catch ( Exception e )
           {
  @@ -160,19 +183,44 @@
       }
   
       /**
  -     * Not implemented, we expect that if an element is available to one of our
  -     * peers, it has already been shared by a previous update.
  +     * If 'getFromPeers' is true, this will attempt to get the requested
  +     * element from ant other members of the group.
        *
  -     * NOTE: This could be easily implemented using a MessageDispatcher and
  -     * ResponseCollector, which would be usefull when the caches get out of
  -     * sync (for example, if one member is rebooted).
  -     *
  -     * @param key Ignored
  -     * @return Always returns null
  +     * @param key
  +     * @return
        * @throws IOException Never thrown by this implementation
        */
       public ICacheElement get( Serializable key ) throws IOException
       {
  +        if ( getFromPeers )
  +        {
  +            CacheElement element = new CacheElement( cacheName, key, null );
  +
  +            Request request = new Request( element, Request.GET );
  +
  +            // Cast message and wait for all responses.
  +
  +            // FIXME: we can stop waiting after the first not null response,
  +            //        that is more difficult to implement however.
  +
  +            RspList responses =
  +                dispatcher.castMessage( null,
  +                                        new Message( null, null, request ),
  +                                        GroupRequest.GET_ALL,
  +                                        0 );
  +
  +            // Get results only gives the responses which were not null
  +
  +            Vector results = responses.getResults();
  +
  +            // If there were any non null results, return the first
  +
  +            if ( results.size() > 0 )
  +            {
  +                return ( ICacheElement ) results.get( 0 );
  +            }
  +        }
  +
           return null;
       }
   
  @@ -212,9 +260,18 @@
        */
       public void dispose() throws IOException
       {
  -        listener.terminate();
  +        // This will join the scheduler thread and ensure everything terminates
  +
  +        dispatcher.stop();
  +
  +        // Now we can disconnect from the group and close the channel
  +
  +        channel.disconnect();
  +        channel.close();
   
           status = CacheConstants.STATUS_DISPOSED;
  +
  +        log.info( "Disposed for cache: " + cacheName );
       }
   
       /**
  @@ -285,6 +342,8 @@
       {
           try
           {
  +            log.info( "Handling" );
  +
               Request request = ( Request ) msg.getObject();
   
               // Switch based on the command and invoke the
  @@ -292,6 +351,11 @@
   
               switch ( request.getCommand() )
               {
  +                case Request.GET:
  +
  +                    return cache.localGet( request.getCacheElement().getKey() );
  +                    // break;
  +
                   case Request.UPDATE:
   
                       cache.localUpdate( request.getCacheElement() );
  @@ -312,6 +376,7 @@
                       log.error( "Recieved unknown command" );
               }
   
  +            log.info( "Handled" );
           }
           catch ( Exception e )
           {
  @@ -321,6 +386,17 @@
           return null;
       }
   
  +    // ------------------------------------------- interface MembershipListener
  +
  +    public void viewAccepted( View view )
  +    {
  +        log.info( "View Changed: " + String.valueOf( view ) );
  +    }
  +
  +    public void suspect( Address suspectedAddress ) { }
  +
  +    public void block() { }
  +
       // ---------------------------------------------------------- inner classes
   
       /**
  @@ -332,6 +408,7 @@
           public final static int UPDATE = 1;
           public final static int REMOVE = 2;
           public final static int REMOVE_ALL = 3;
  +        public final static int GET = 5;
   
           private ICacheElement cacheElement;
           private int command;
  @@ -350,92 +427,6 @@
           public int getCommand()
           {
               return command;
  -        }
  -    }
  -
  -    /**
  -     * Listens for messages on the provided channel and dispatches them to the
  -     * CompositeCache that this auxiliary was created with. Also, when
  -     * terminated is resposible for disconnecting and closing the Channel.
  -     */
  -    static class Listener extends Thread
  -    {
  -        boolean running = true;
  -
  -        RequestHandler handler;
  -
  -        Channel channel;
  -        Object object;
  -
  -        public Listener( Channel channel,
  -                         RequestHandler handler,
  -                         String cacheName )
  -        {
  -            super( "JavaGroupsCache.Listener: " + cacheName );
  -
  -            this.channel = channel;
  -            this.handler = handler;
  -        }
  -
  -        public void run()
  -        {
  -            while ( running )
  -            {
  -                // Get next object from channel, blocking indefinately
  -
  -                try
  -                {
  -                    object = channel.receive( 0 );
  -                }
  -                catch ( Exception e )
  -                {
  -                    // If we are still supposed to be running, pause and
  -                    // continue.
  -
  -                    if ( running )
  -                    {
  -                        try
  -                        {
  -                            Thread.sleep( 1000 );
  -                        }
  -                        catch ( Exception ignored )
  -                        {
  -                            // Ignored
  -                        }
  -                    }
  -                }
  -
  -
  -                // Ignore anything that is not a message
  -
  -                if ( object instanceof Message )
  -                {
  -                    handler.handle( ( Message ) object );
  -                }
  -            }
  -
  -            // When terminated, clean up the channel. Termination should be
  -            // initiated by the dispose() method of the enclosing
  -            // JavaGroupsCache
  -
  -            channel.disconnect();
  -            channel.close();
  -        }
  -
  -        public void terminate()
  -        {
  -            running = false;
  -
  -            this.interrupt();
  -
  -            try
  -            {
  -                this.join( 1000 );
  -            }
  -            catch ( InterruptedException e )
  -            {
  -                // Ignored, not much we can do anymore.
  -            }
           }
       }
   }
  
  
  
  1.2       +12 -1     
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheAttributes.java
  
  Index: JavaGroupsCacheAttributes.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-turbine-jcs/auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheAttributes.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- JavaGroupsCacheAttributes.java    7 Aug 2002 15:35:16 -0000       1.1
  +++ JavaGroupsCacheAttributes.java    10 Aug 2002 23:52:45 -0000      1.2
  @@ -88,6 +88,7 @@
   
       private String channelFactoryClassName = "org.javagroups.JChannelFactory";
       private String channelProperties = null;
  +    private boolean getFromPeers = false;
   
       public String getChannelFactoryClassName()
       {
  @@ -107,6 +108,16 @@
       public void setChannelProperties( String channelProperties )
       {
           this.channelProperties = channelProperties;
  +    }
  +
  +    public boolean isGetFromPeers()
  +    {
  +        return getFromPeers;
  +    }
  +
  +    public void setGetFromPeers( boolean getFromPeers )
  +    {
  +        this.getFromPeers = getFromPeers;
       }
   
       // ----------------------------------------------- AuxiliaryCacheAttributes
  
  
  
  1.2       +4 -2      
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheFactory.java
  
  Index: JavaGroupsCacheFactory.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-turbine-jcs/auxiliary-builds/javagroups/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheFactory.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- JavaGroupsCacheFactory.java       7 Aug 2002 15:35:16 -0000       1.1
  +++ JavaGroupsCacheFactory.java       10 Aug 2002 23:52:45 -0000      1.2
  @@ -101,7 +101,9 @@
   
               // Return a new JavaGroupsCache for the new channel.
   
  -            return new JavaGroupsCache( cache, channel );
  +            return new JavaGroupsCache( cache,
  +                                        channel,
  +                                        attributes.isGetFromPeers() );
           }
           catch ( Exception e )
           {
  
  
  
  1.2       +1 -1      
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheTest.ccf
  
  Index: JavaGroupsCacheTest.ccf
  ===================================================================
  RCS file: 
/home/cvs/jakarta-turbine-jcs/auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheTest.ccf,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- JavaGroupsCacheTest.ccf   7 Aug 2002 15:35:16 -0000       1.1
  +++ JavaGroupsCacheTest.ccf   10 Aug 2002 23:52:45 -0000      1.2
  @@ -9,4 +9,4 @@
   jcs.auxiliary.JG = org.apache.jcs.auxiliary.javagroups.JavaGroupsCacheFactory
   jcs.auxiliary.JG.attributes = 
org.apache.jcs.auxiliary.javagroups.JavaGroupsCacheAttributes
   jcs.auxiliary.JG.attributes.ChannelFactoryClassName = org.javagroups.JChannelFactory
  -jcs.auxiliary.JG.attributes.ChannelProperties = 
UDP(mcast_addr=224.0.0.100;mcast_port=7501):PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE
  +jcs.auxiliary.JG.attributes.ChannelProperties = 
UDP(mcast_addr=224.0.0.100;mcast_port=7501):PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:QUEUE
  \ No newline at end of file
  
  
  
  1.3       +3 -3      
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheTest.java
  
  Index: JavaGroupsCacheTest.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-turbine-jcs/auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheTest.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- JavaGroupsCacheTest.java  7 Aug 2002 15:46:34 -0000       1.2
  +++ JavaGroupsCacheTest.java  10 Aug 2002 23:52:45 -0000      1.3
  @@ -45,7 +45,7 @@
           // Wait for it to propogate -- FIXME: This is time sensitive and thus
           //                                    a bad idea for a unit test.
   
  -        Thread.sleep( 1000 );
  +        Thread.sleep( 500 );
   
           // Assert that the values were correctly propogated
   
  @@ -59,7 +59,7 @@
   
           one.remove( "2" );
   
  -        Thread.sleep( 1000 );
  +        Thread.sleep( 500 );
   
           assertEquals( null, two.get( "2") );
   
  @@ -67,7 +67,7 @@
   
           one.remove();
   
  -        Thread.sleep( 1000 );
  +        Thread.sleep( 500 );
   
           assertEquals( null, two.get( "1" ) );
           assertEquals( null, two.get( "2" ) );
  
  
  
  1.1                  
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheWithGetTest.ccf
  
  Index: JavaGroupsCacheWithGetTest.ccf
  ===================================================================
  jcs.default = JG
  jcs.default.elementattributes = org.apache.jcs.engine.ElementAttributes
  jcs.default.elementattributes.IsEternal = true
  jcs.default.elementattributes.MaxLifeSeconds = 60
  jcs.default.elementattributes.IsSpool = true
  jcs.default.elementattributes.IsRemote = true
  jcs.default.elementattributes.IsLateral = true
  
  jcs.auxiliary.JG = org.apache.jcs.auxiliary.javagroups.JavaGroupsCacheFactory
  jcs.auxiliary.JG.attributes = 
org.apache.jcs.auxiliary.javagroups.JavaGroupsCacheAttributes
  jcs.auxiliary.JG.attributes.ChannelFactoryClassName = org.javagroups.JChannelFactory
  jcs.auxiliary.JG.attributes.ChannelProperties = 
UDP(mcast_addr=224.0.0.100;mcast_port=7501):PING:FD:STABLE:NAKACK:UNICAST:FRAG:FLUSH:GMS:VIEW_ENFORCER:QUEUE
  jcs.auxiliary.JG.attributes.GetFromPeers = true
  
  
  1.1                  
jakarta-turbine-jcs/auxiliary-builds/javagroups/src/test/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheWithGetTest.java
  
  Index: JavaGroupsCacheWithGetTest.java
  ===================================================================
  package org.apache.jcs.auxiliary.javagroups;
  
  import org.apache.jcs.engine.control.CompositeCacheManager;
  import org.apache.jcs.access.CacheAccess;
  import org.javagroups.log.Tracer;
  import org.javagroups.log.Trace;
  
  import java.util.Properties;
  import java.io.IOException;
  
  import junit.framework.TestCase;
  
  public class JavaGroupsCacheWithGetTest extends TestCase
  {
      public JavaGroupsCacheWithGetTest( String testName )
      {
          super( testName );
      }
  
      public void testWithGet() throws Exception
      {
          // Create and configure first manager and region
  
          CompositeCacheManager manager1 = new CompositeCacheManager();
  
          manager1.configure( getProperties() );
  
          CacheAccess one = new CacheAccess( manager1.getCache( "testCache" ) );
  
          one.put( "1", "one" );
          one.put( "2", "two" );
          one.put( "3", "three" );
          one.put( "4", "four" );
          one.put( "5", "five" );
  
          // Now get second manager and region, it will join the group
  
          CompositeCacheManager manager2 = new CompositeCacheManager();
  
          manager2.configure( getProperties() );
  
          CacheAccess two = new CacheAccess( manager2.getCache( "testCache" ) );
  
          assertEquals( "one",   two.get( "1" ) );
          assertEquals( "two",   two.get( "2" ) );
          assertEquals( "three", two.get( "3" ) );
          assertEquals( "four",  two.get( "4" ) );
          assertEquals( "five",  two.get( "5" ) );
  
          // Free caches
  
          manager1.freeCache( "testCache" );
          manager2.freeCache( "testCache" );
      }
  
      private Properties getProperties() throws IOException
      {
          Properties props = new Properties();
  
          props.load( getClass().getResourceAsStream(
              "/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheWithGetTest.ccf" ) 
);
  
          return props;
      }
  }
  
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to