asmuts      02/01/14 22:23:41

  Added:       src/java/org/apache/stratum/jcs/auxiliary/lateral/http/broadcast
                        LateralCacheMulticaster.java
                        LateralCacheTester.java LateralCacheThread.java
                        LateralCacheUnicaster.java
  Log:
  the start of a lateral cache system
  needs some work
  
  Revision  Changes    Path
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/http/broadcast/LateralCacheMulticaster.java
  
  Index: LateralCacheMulticaster.java
  ===================================================================
  package  org.apache.stratum.jcs.auxiliary.lateral.http.broadcast;
  
  import  java.io.*;
  import  java.net.*;
  import  java.util.*;
  import  java.sql.*;
  
  import  org.apache.stratum.jcs.engine.memory.*;
  import  org.apache.stratum.jcs.engine.behavior.ICacheElement;
  import  org.apache.stratum.jcs.engine.behavior.ICache;
  import  org.apache.stratum.jcs.utils.reuse.*;
  
  import  org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.utils.log.*;
  
  /*
   * Used to multi-cast a key/val pair to the named cache on multiple servers.
   */
  public class LateralCacheMulticaster {
  
    private static final String servlet = "/cache/cache/LateralCacheServletReceiver";
    private final ICacheElement ice;
    private final ArrayList servers;
    protected Logger log;
  
    ////////////////////////////////////////////////////////////////////////
    public LateralCacheMulticaster ( ICacheElement ice, ArrayList servers) {
  
      this.servers = servers;
      this.ice = ice;
  
      LoggerManager loggerMgr = LoggerManager.getInstance();
      log = loggerMgr.getLogger(this);
      if ( log.logLevel >= log.DEBUG ) {
        log.debug("In DistCacheMulticaster");
      }
  
    } // end constructor
  
  
    /** Multi-casts the cache changes to the distributed servers. */
    public void multicast () {
  
      ThreadPoolManager tpm = ThreadPoolManager.getInstance();
      Iterator it = servers.iterator();
      while ( it.hasNext() ) {
        tpm.runIt(new LateralCacheUnicaster(ice, (String)it.next() + servlet));
      }
      return;
    }             // end run
  
  
  }               // end class
  
  
  
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/http/broadcast/LateralCacheTester.java
  
  Index: LateralCacheTester.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.lateral.http.broadcast;
  
  import org.apache.stratum.jcs.auxiliary.lateral.http.broadcast.*;
  import org.apache.stratum.jcs.auxiliary.lateral.http.remove.*;
  import org.apache.stratum.jcs.auxiliary.lateral.http.server.*;
  
  import org.apache.stratum.jcs.engine.memory.*;
  
  ////////////////////////////////////////////////////////////////////////
  /**
   *  @author Aaron Smuts
   *  @version 1.0
   */
  public class LateralCacheTester{
  
        public static void main( String args[] ){
  
      String[] servers = { "10.1.17.109","10.1.17.108" };
  
                try {
  
        //for ( int i=0; i <100; i++ ) {
          String val = "test object value";
                          LateralCacheThread dct = new LateralCacheThread ( 
"testTable", "testkey", val, servers );
                  dct.setPriority( Thread.NORM_PRIORITY - 1 );
                          dct.start();
  
          String val2 = "test object value2";
                          LateralCacheThread dct2 = new LateralCacheThread ( 
"testTable", "testkey", val, servers );
                  dct2.setPriority( Thread.NORM_PRIORITY - 1 );
                          dct2.start();
        //}
  
        } catch( Exception e ){
                        System.out.println( e.toString() );
                }
  
        }
  
  
  
  }     // end class
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/http/broadcast/LateralCacheThread.java
  
  Index: LateralCacheThread.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.lateral.http.broadcast;
  
  import java.io.*;
  import java.net.*;
  import java.util.*;
  import java.sql.*;
  
  
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.utils.log.*;
  import 
org.apache.stratum.jcs.auxiliary.lateral.http.server.LateralCacheServletReciever;
  import org.apache.stratum.jcs.engine.memory.*;
  import org.apache.stratum.jcs.engine.behavior.ICacheElement;
  import org.apache.stratum.jcs.engine.behavior.ICache;
  
  
  
  ////////////////////////////////////////////////////////////////////////
  /**
   *  @author Aaron Smuts
   *  @version 1.0
   */
  public class LateralCacheThread extends Thread {
  
    private static final String servlet = "/rcash/ramraf/DistCacheServlet";
    protected String hashtableName;
    protected String key;
    protected Serializable val;
    protected boolean debug = false; //true;
    protected boolean running = true;
    protected Logger log;
    protected String[] servers;
  
    ////////////////////////////////////////////////////////////////////////
    public LateralCacheThread( String hashtableName, String key, Serializable val, 
String[] servers ) {
      this.hashtableName = hashtableName;
      this.key = key;
      this.val = val;
      this.servers = servers;
      LoggerManager loggerMgr = LoggerManager.getInstance();
      log = loggerMgr.getLogger( this );
      log.debug("In DistCacheThread" );
    }
  
    ////////////////////////////////////////////////
    public void run() {
      try {
        long start = System.currentTimeMillis();
        //if ( running ) {
        ICacheElement cb = new CacheElement(hashtableName, key, val);
        log.debug( "key = " + key );
        String result = sendCache( cb );
        sleep(100);
        running = false;
        long end = System.currentTimeMillis();
        log.info( "transfer took " + String.valueOf( end - start ) + " millis" );
        log.flush();
        log = null;
        return;
        //}
      } catch ( InterruptedException e ) {
        running = false;
        return;
      }
  
    }// end run
  
    ////////////////////////////////////////////////////////
    public String sendCache( ICacheElement cb ) {
      String response = "";
      try {
        for ( int i = 0; i < servers.length; i++) {
          if ( debug ) {
              log.logIt( "servers[i] + servlet = " + servers[i] + servlet );
                }
              // create our URL
                URL tmpURL = new URL(servers[i] + servlet) ;
                URL url = new URL( tmpURL.toExternalForm());
          if ( debug ) {
            log.logIt( "tmpURL = " + tmpURL );
          }
  
                // Open our URLConnection
                if ( debug ) {
            log.logIt("Opening Connection.");
          }
                URLConnection con = url.openConnection();
                if ( debug ) {
            log.logIt("con = " + con );
          }
                writeObj( con, cb );
                response = read(con);
          if ( debug ) {
             log.logIt( "response = " + response );
          }
        } // end for
      } catch (MalformedURLException mue) {
         log.error( mue );
      } catch (Exception e) {
        log.error( e );
      }// end catch
  
      // suggest gc
      cb = null;
      running = false;
      return response;
   } // end send cache
  
  
    /////////////////////////////////////////////////////////
    // Write the Answer to the Connection
    public void writeObj( URLConnection connection, ICacheElement cb ) {
      try {
          connection.setUseCaches(false);
          connection.setRequestProperty("CONTENT_TYPE", "application/octet-stream");
          connection.setDoOutput(true);
          connection.setDoInput(true);
          ObjectOutputStream os =  new ObjectOutputStream( 
connection.getOutputStream() );
          log.debug("os = " + os );
  
          // Write the ICacheItem to the ObjectOutputStream
          if ( debug ) {
            log.logIt("Writing  ICacheItem.");
          }
          os.writeObject( cb );
          os.flush();
          if ( debug ) {
            log.logIt("closing output stream");
          }
          os.close();
      }catch (IOException e) {
        log.error( e );
      } // end catch
    }
  
    ////////////////////////////////////////////////////////
    public String read( URLConnection connection ) {
      String result = "";
      try {
        ObjectInputStream is = new ObjectInputStream( connection.getInputStream() );
        result = (String)is.readObject();
        is.close();
        if ( debug ) {
        //System.out.println("got result.");
        }
        log.logIt( "got result = " + result );
      } catch (IOException e) {
          log.error( e );
      } catch (ClassNotFoundException ce) {
          log.error( ce );
      }
      return result;
    } // end read
  
  }     // end class
  
  
  
  1.1                  
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/http/broadcast/LateralCacheUnicaster.java
  
  Index: LateralCacheUnicaster.java
  ===================================================================
  package org.apache.stratum.jcs.auxiliary.lateral.http.broadcast;
  
  import java.io.*;
  import java.net.*;
  import java.util.*;
  import java.sql.*;
  
  import org.apache.stratum.jcs.engine.memory.*;
  import org.apache.stratum.jcs.engine.behavior.*;
  import org.apache.stratum.jcs.engine.*;
  import org.apache.stratum.jcs.utils.reuse.*;
  
  import org.apache.stratum.jcs.utils.log.*;
  
  /**
   * Used to uni-cast a ICacheItem to the named cache on the target server.
   */
  public class LateralCacheUnicaster implements IThreadPoolRunnable {
  
    private final ICacheElement item;
    private final String urlStr;
    private final Logger log;
  
    private URLConnection conn;
    protected boolean debug = false;
    /**
     * Constructs with the given ICacheItem and the url of the target server.
     */
    public LateralCacheUnicaster(ICacheElement item, String urlStr) {
      this.item = item;
      this.urlStr = urlStr;
      log = LoggerManager.getInstance().getLogger( this );
      log.debug("In LateralCacheUnicaster2, " + Thread.currentThread().getName() );
    }
    /**
     * Called when this object is first loaded in the thread pool.
     * Important: all workers in a pool must be of the same type,
     * otherwise the mechanism becomes more complex.
     */
    public Object[] getInitData() { return null; }
    /**
     * Sends a ICacheItem to the target server.
     * This method will be executed in one of the pool's threads. The
     *  thread will be returned to the pool.
     */
     // Todo: error recovery and retry.
    public void runIt(Object thData[]) {
      long start = System.currentTimeMillis();
  
      try {
        if ( debug ) {
            log.logIt( "url = " + urlStr );
        }
            // create our URL
        URL tmpURL = new URL(urlStr) ;
        URL url = new URL( tmpURL.toExternalForm());
        if ( debug ) {
          log.logIt( "tmpURL = " + tmpURL );
        }
  
        // Open our URLConnection
        if ( debug ) {
          log.logIt("Opening Connection.");
        }
        conn = url.openConnection();
        if ( log.logLevel >= log.DEBUG ) {
          log.debug("conn = " + conn );
        }
        String response = sendCacheItem();
        if ( log.logLevel >= log.DEBUG ) {
          log.debug("response = " + response );
        }
        conn = null;
      } catch (MalformedURLException mue) {
         log.warn( "mue - Unicaster couldn't connect to bad url " + urlStr );
      } catch ( ConnectException ce) {
        log.warn( "ce - Unicaster couldn't connect to " + urlStr );
      } catch ( IOException ioe) {
        log.warn( "ioe - Unicaster couldn't connect to " + urlStr );
      }// end catch
  
      if ( log.logLevel >= log.DEBUG ) {
        log.debug( "transfer took " + String.valueOf( System.currentTimeMillis() - 
start ) + " millis\n" );
      }
      log.flush();
      return;
   } // end send cache
  
    /**
     * Sends the ICacheItem to the current URLConnection.
     * @return the response from the current URLConnection.
     */
    private String sendCacheItem() {
      try {
          conn.setUseCaches(false);
          conn.setRequestProperty("CONTENT_TYPE", "application/octet-stream");
          conn.setDoOutput(true);
          conn.setDoInput(true);
          ObjectOutputStream os =  new ObjectOutputStream( conn.getOutputStream() );
          try {
            log.debug("os = " + os );
  
            // Write the ICacheItem to the ObjectOutputStream
            if ( debug ) {
              log.logIt("Writing ICacheItem.");
            }
            os.writeObject( item );
          } finally {
            os.close();
          }
        return getResponse();
      }catch (IOException e) {
        log.warn( "ie - couldn't send item " + urlStr );
      } // end catch
      return "";
    }
  
    ////////////////////////////////////////////////////////
    private String getResponse() throws IOException {
      String result = "";
      try {
        ObjectInputStream is = new ObjectInputStream( conn.getInputStream() );
        try {
          result = (String)is.readObject();
        } finally {
          is.close();
        }
        if ( log.logLevel >= log.DEBUG ) {
          log.debug( "got result = " + result );
        }
      } catch (ClassNotFoundException ce) {
          log.error( ce );
      }
      catch ( Exception e ) {
        log.error( e );
      }
      return result;
    } // end getResponse
  
  
  }     // end class
  
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to