asmuts 02/01/14 22:23:41
Added: src/java/org/apache/stratum/jcs/auxiliary/lateral/http/broadcast
LateralCacheMulticaster.java
LateralCacheTester.java LateralCacheThread.java
LateralCacheUnicaster.java
Log:
the start of a lateral cache system
needs some work
Revision Changes Path
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/http/broadcast/LateralCacheMulticaster.java
Index: LateralCacheMulticaster.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.http.broadcast;
import java.io.*;
import java.net.*;
import java.util.*;
import java.sql.*;
import org.apache.stratum.jcs.engine.memory.*;
import org.apache.stratum.jcs.engine.behavior.ICacheElement;
import org.apache.stratum.jcs.engine.behavior.ICache;
import org.apache.stratum.jcs.utils.reuse.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.utils.log.*;
/*
* Used to multi-cast a key/val pair to the named cache on multiple servers.
*/
public class LateralCacheMulticaster {
private static final String servlet = "/cache/cache/LateralCacheServletReceiver";
private final ICacheElement ice;
private final ArrayList servers;
protected Logger log;
////////////////////////////////////////////////////////////////////////
public LateralCacheMulticaster ( ICacheElement ice, ArrayList servers) {
this.servers = servers;
this.ice = ice;
LoggerManager loggerMgr = LoggerManager.getInstance();
log = loggerMgr.getLogger(this);
if ( log.logLevel >= log.DEBUG ) {
log.debug("In DistCacheMulticaster");
}
} // end constructor
/** Multi-casts the cache changes to the distributed servers. */
public void multicast () {
ThreadPoolManager tpm = ThreadPoolManager.getInstance();
Iterator it = servers.iterator();
while ( it.hasNext() ) {
tpm.runIt(new LateralCacheUnicaster(ice, (String)it.next() + servlet));
}
return;
} // end run
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/http/broadcast/LateralCacheTester.java
Index: LateralCacheTester.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.http.broadcast;
import org.apache.stratum.jcs.auxiliary.lateral.http.broadcast.*;
import org.apache.stratum.jcs.auxiliary.lateral.http.remove.*;
import org.apache.stratum.jcs.auxiliary.lateral.http.server.*;
import org.apache.stratum.jcs.engine.memory.*;
////////////////////////////////////////////////////////////////////////
/**
* @author Aaron Smuts
* @version 1.0
*/
public class LateralCacheTester{
public static void main( String args[] ){
String[] servers = { "10.1.17.109","10.1.17.108" };
try {
//for ( int i=0; i <100; i++ ) {
String val = "test object value";
LateralCacheThread dct = new LateralCacheThread (
"testTable", "testkey", val, servers );
dct.setPriority( Thread.NORM_PRIORITY - 1 );
dct.start();
String val2 = "test object value2";
LateralCacheThread dct2 = new LateralCacheThread (
"testTable", "testkey", val, servers );
dct2.setPriority( Thread.NORM_PRIORITY - 1 );
dct2.start();
//}
} catch( Exception e ){
System.out.println( e.toString() );
}
}
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/http/broadcast/LateralCacheThread.java
Index: LateralCacheThread.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.http.broadcast;
import java.io.*;
import java.net.*;
import java.util.*;
import java.sql.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.utils.log.*;
import
org.apache.stratum.jcs.auxiliary.lateral.http.server.LateralCacheServletReciever;
import org.apache.stratum.jcs.engine.memory.*;
import org.apache.stratum.jcs.engine.behavior.ICacheElement;
import org.apache.stratum.jcs.engine.behavior.ICache;
////////////////////////////////////////////////////////////////////////
/**
* @author Aaron Smuts
* @version 1.0
*/
public class LateralCacheThread extends Thread {
private static final String servlet = "/rcash/ramraf/DistCacheServlet";
protected String hashtableName;
protected String key;
protected Serializable val;
protected boolean debug = false; //true;
protected boolean running = true;
protected Logger log;
protected String[] servers;
////////////////////////////////////////////////////////////////////////
public LateralCacheThread( String hashtableName, String key, Serializable val,
String[] servers ) {
this.hashtableName = hashtableName;
this.key = key;
this.val = val;
this.servers = servers;
LoggerManager loggerMgr = LoggerManager.getInstance();
log = loggerMgr.getLogger( this );
log.debug("In DistCacheThread" );
}
////////////////////////////////////////////////
public void run() {
try {
long start = System.currentTimeMillis();
//if ( running ) {
ICacheElement cb = new CacheElement(hashtableName, key, val);
log.debug( "key = " + key );
String result = sendCache( cb );
sleep(100);
running = false;
long end = System.currentTimeMillis();
log.info( "transfer took " + String.valueOf( end - start ) + " millis" );
log.flush();
log = null;
return;
//}
} catch ( InterruptedException e ) {
running = false;
return;
}
}// end run
////////////////////////////////////////////////////////
public String sendCache( ICacheElement cb ) {
String response = "";
try {
for ( int i = 0; i < servers.length; i++) {
if ( debug ) {
log.logIt( "servers[i] + servlet = " + servers[i] + servlet );
}
// create our URL
URL tmpURL = new URL(servers[i] + servlet) ;
URL url = new URL( tmpURL.toExternalForm());
if ( debug ) {
log.logIt( "tmpURL = " + tmpURL );
}
// Open our URLConnection
if ( debug ) {
log.logIt("Opening Connection.");
}
URLConnection con = url.openConnection();
if ( debug ) {
log.logIt("con = " + con );
}
writeObj( con, cb );
response = read(con);
if ( debug ) {
log.logIt( "response = " + response );
}
} // end for
} catch (MalformedURLException mue) {
log.error( mue );
} catch (Exception e) {
log.error( e );
}// end catch
// suggest gc
cb = null;
running = false;
return response;
} // end send cache
/////////////////////////////////////////////////////////
// Write the Answer to the Connection
public void writeObj( URLConnection connection, ICacheElement cb ) {
try {
connection.setUseCaches(false);
connection.setRequestProperty("CONTENT_TYPE", "application/octet-stream");
connection.setDoOutput(true);
connection.setDoInput(true);
ObjectOutputStream os = new ObjectOutputStream(
connection.getOutputStream() );
log.debug("os = " + os );
// Write the ICacheItem to the ObjectOutputStream
if ( debug ) {
log.logIt("Writing ICacheItem.");
}
os.writeObject( cb );
os.flush();
if ( debug ) {
log.logIt("closing output stream");
}
os.close();
}catch (IOException e) {
log.error( e );
} // end catch
}
////////////////////////////////////////////////////////
public String read( URLConnection connection ) {
String result = "";
try {
ObjectInputStream is = new ObjectInputStream( connection.getInputStream() );
result = (String)is.readObject();
is.close();
if ( debug ) {
//System.out.println("got result.");
}
log.logIt( "got result = " + result );
} catch (IOException e) {
log.error( e );
} catch (ClassNotFoundException ce) {
log.error( ce );
}
return result;
} // end read
} // end class
1.1
jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/http/broadcast/LateralCacheUnicaster.java
Index: LateralCacheUnicaster.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.http.broadcast;
import java.io.*;
import java.net.*;
import java.util.*;
import java.sql.*;
import org.apache.stratum.jcs.engine.memory.*;
import org.apache.stratum.jcs.engine.behavior.*;
import org.apache.stratum.jcs.engine.*;
import org.apache.stratum.jcs.utils.reuse.*;
import org.apache.stratum.jcs.utils.log.*;
/**
* Used to uni-cast a ICacheItem to the named cache on the target server.
*/
public class LateralCacheUnicaster implements IThreadPoolRunnable {
private final ICacheElement item;
private final String urlStr;
private final Logger log;
private URLConnection conn;
protected boolean debug = false;
/**
* Constructs with the given ICacheItem and the url of the target server.
*/
public LateralCacheUnicaster(ICacheElement item, String urlStr) {
this.item = item;
this.urlStr = urlStr;
log = LoggerManager.getInstance().getLogger( this );
log.debug("In LateralCacheUnicaster2, " + Thread.currentThread().getName() );
}
/**
* Called when this object is first loaded in the thread pool.
* Important: all workers in a pool must be of the same type,
* otherwise the mechanism becomes more complex.
*/
public Object[] getInitData() { return null; }
/**
* Sends a ICacheItem to the target server.
* This method will be executed in one of the pool's threads. The
* thread will be returned to the pool.
*/
// Todo: error recovery and retry.
public void runIt(Object thData[]) {
long start = System.currentTimeMillis();
try {
if ( debug ) {
log.logIt( "url = " + urlStr );
}
// create our URL
URL tmpURL = new URL(urlStr) ;
URL url = new URL( tmpURL.toExternalForm());
if ( debug ) {
log.logIt( "tmpURL = " + tmpURL );
}
// Open our URLConnection
if ( debug ) {
log.logIt("Opening Connection.");
}
conn = url.openConnection();
if ( log.logLevel >= log.DEBUG ) {
log.debug("conn = " + conn );
}
String response = sendCacheItem();
if ( log.logLevel >= log.DEBUG ) {
log.debug("response = " + response );
}
conn = null;
} catch (MalformedURLException mue) {
log.warn( "mue - Unicaster couldn't connect to bad url " + urlStr );
} catch ( ConnectException ce) {
log.warn( "ce - Unicaster couldn't connect to " + urlStr );
} catch ( IOException ioe) {
log.warn( "ioe - Unicaster couldn't connect to " + urlStr );
}// end catch
if ( log.logLevel >= log.DEBUG ) {
log.debug( "transfer took " + String.valueOf( System.currentTimeMillis() -
start ) + " millis\n" );
}
log.flush();
return;
} // end send cache
/**
* Sends the ICacheItem to the current URLConnection.
* @return the response from the current URLConnection.
*/
private String sendCacheItem() {
try {
conn.setUseCaches(false);
conn.setRequestProperty("CONTENT_TYPE", "application/octet-stream");
conn.setDoOutput(true);
conn.setDoInput(true);
ObjectOutputStream os = new ObjectOutputStream( conn.getOutputStream() );
try {
log.debug("os = " + os );
// Write the ICacheItem to the ObjectOutputStream
if ( debug ) {
log.logIt("Writing ICacheItem.");
}
os.writeObject( item );
} finally {
os.close();
}
return getResponse();
}catch (IOException e) {
log.warn( "ie - couldn't send item " + urlStr );
} // end catch
return "";
}
////////////////////////////////////////////////////////
private String getResponse() throws IOException {
String result = "";
try {
ObjectInputStream is = new ObjectInputStream( conn.getInputStream() );
try {
result = (String)is.readObject();
} finally {
is.close();
}
if ( log.logLevel >= log.DEBUG ) {
log.debug( "got result = " + result );
}
} catch (ClassNotFoundException ce) {
log.error( ce );
}
catch ( Exception e ) {
log.error( e );
}
return result;
} // end getResponse
} // end class
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>