Author: fhanik Date: Thu Dec 14 17:35:06 2006 New Revision: 487420 URL: http://svn.apache.org/viewvc?view=rev&rev=487420 Log: Implemented the use of an executor, tasks that aren't able to run quite yet, will be added to a pool of events, this pool of events is a zero GC (RxTaskPool) and is bounded.
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java?view=diff&rev=487420&r1=487419&r2=487420 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/AbstractRxTask.java Thu Dec 14 17:35:06 2006 @@ -26,7 +26,7 @@ * @author Filip Hanik * @version $Revision$ $Date$ */ -public abstract class AbstractRxTask extends Thread +public abstract class AbstractRxTask implements Runnable { public static final int OPTION_DIRECT_BUFFER = ReceiverBase.OPTION_DIRECT_BUFFER; @@ -41,7 +41,7 @@ this.callback = callback; } - public void setPool(RxTaskPool pool) { + public void setTaskPool(RxTaskPool pool) { this.pool = pool; } @@ -57,7 +57,7 @@ this.doRun = doRun; } - public RxTaskPool getPool() { + public RxTaskPool getTaskPool() { return pool; } Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?view=diff&rev=487420&r1=487419&r2=487420 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java Thu Dec 14 17:35:06 2006 @@ -19,6 +19,11 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelReceiver; @@ -58,8 +63,10 @@ private long tcpSelectorTimeout = 5000; //how many times to search for an available socket private int autoBind = 100; - private int maxThreads = 6; + private int maxThreads = 15; private int minThreads = 6; + private int maxTasks = 100; + private int minTasks = 10; private boolean tcpNoDelay = true; private boolean soKeepAlive = false; private boolean ooBInline = true; @@ -69,11 +76,23 @@ private int soTrafficClass = 0x04 | 0x08 | 0x010; private int timeout = 3000; //3 seconds private boolean useBufferPool = true; + + private Executor executor; public ReceiverBase() { } + public void start() throws IOException { + if ( executor == null ) { + executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()); + } + } + + public void stop() { + if ( executor instanceof ExecutorService ) ((ExecutorService)executor).shutdown(); + } + /** * getMessageListener * @@ -270,7 +289,7 @@ return listener; } - public RxTaskPool getPool() { + public RxTaskPool getTaskPool() { return pool; } @@ -335,6 +354,22 @@ return securePort; } + public int getMinTasks() { + return minTasks; + } + + public int getMaxTasks() { + return maxTasks; + } + + public Executor getExecutor() { + return executor; + } + + public boolean isListening() { + return listen; + } + /** * @deprecated use setSelectorTimeout * @param selTimeout long @@ -429,7 +464,20 @@ this.securePort = securePort; } + public void setMinTasks(int minTasks) { + this.minTasks = minTasks; + } + + public void setMaxTasks(int maxTasks) { + this.maxTasks = maxTasks; + } + + public void setExecutor(Executor executor) { + this.executor = executor; + } + public void heartbeat() { //empty operation } + } Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java?view=diff&rev=487420&r1=487419&r2=487420 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/RxTaskPool.java Thu Dec 14 17:35:06 2006 @@ -19,6 +19,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.ThreadFactory; /** * @author not attributable @@ -40,8 +41,8 @@ boolean running = true; private static int counter = 1; - private int maxThreads; - private int minThreads; + private int maxTasks; + private int minTasks; private TaskCreator creator = null; @@ -50,34 +51,27 @@ } - public RxTaskPool (int maxThreads, int minThreads, TaskCreator creator) throws Exception { + public RxTaskPool (int maxTasks, int minTasks, TaskCreator creator) throws Exception { // fill up the pool with worker threads - this.maxThreads = maxThreads; - this.minThreads = minThreads; + this.maxTasks = maxTasks; + this.minTasks = minTasks; this.creator = creator; - //for (int i = 0; i < minThreads; i++) { - for (int i = 0; i < maxThreads; i++) { //temporary fix for thread hand off problem - AbstractRxTask thread = creator.getWorkerThread(); - setupThread(thread); - idle.add (thread); - } } - protected void setupThread(AbstractRxTask thread) { - synchronized (thread) { - thread.setPool(this); - thread.setName(thread.getClass().getName() + "[" + inc() + "]"); - thread.setDaemon(true); - thread.setPriority(Thread.MAX_PRIORITY); - thread.start(); - try {thread.wait(500); }catch ( InterruptedException x ) {} + protected void configureTask(AbstractRxTask task) { + synchronized (task) { + task.setTaskPool(this); +// task.setName(task.getClass().getName() + "[" + inc() + "]"); +// task.setDaemon(true); +// task.setPriority(Thread.MAX_PRIORITY); +// task.start(); } } /** * Find an idle worker thread, if any. Could return null. */ - public AbstractRxTask getWorker() + public AbstractRxTask getRxTask() { AbstractRxTask worker = null; synchronized (mutex) { @@ -89,9 +83,9 @@ //this means that there are no available workers worker = null; } - } else if ( used.size() < this.maxThreads && creator != null) { - worker = creator.getWorkerThread(); - setupThread(worker); + } else if ( used.size() < this.maxTasks && creator != null) { + worker = creator.createRxTask(); + configureTask(worker); } else { try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();} } @@ -114,7 +108,7 @@ synchronized (mutex) { used.remove(worker); //if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker); - if ( idle.size() < maxThreads && !idle.contains(worker)) idle.add(worker); //let max be the upper limit + if ( idle.size() < maxTasks && !idle.contains(worker)) idle.add(worker); //let max be the upper limit else { worker.setDoRun(false); synchronized (worker){worker.notify();} @@ -128,11 +122,11 @@ } public int getMaxThreads() { - return maxThreads; + return maxTasks; } public int getMinThreads() { - return minThreads; + return minTasks; } public void stop() { @@ -147,19 +141,19 @@ } } - public void setMaxThreads(int maxThreads) { - this.maxThreads = maxThreads; + public void setMaxTasks(int maxThreads) { + this.maxTasks = maxThreads; } - public void setMinThreads(int minThreads) { - this.minThreads = minThreads; + public void setMinTasks(int minThreads) { + this.minTasks = minThreads; } - public TaskCreator getThreadCreator() { + public TaskCreator getTaskCreator() { return this.creator; } - public static interface TaskCreator { - public AbstractRxTask getWorkerThread(); + public static interface TaskCreator { + public AbstractRxTask createRxTask(); } } Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java?view=diff&rev=487420&r1=487419&r2=487420 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReceiver.java Thu Dec 14 17:35:06 2006 @@ -53,6 +53,7 @@ * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method */ public void start() throws IOException { + super.start(); try { setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this)); } catch (Exception x) { @@ -73,7 +74,7 @@ } } - public AbstractRxTask getWorkerThread() { + public AbstractRxTask createRxTask() { return getReplicationThread(); } @@ -93,6 +94,7 @@ try { this.serverSocket.close(); }catch ( Exception x ) {} + super.stop(); } @@ -125,20 +127,21 @@ while ( doListen() ) { Socket socket = null; - if ( getPool().available() < 1 ) { + if ( getTaskPool().available() < 1 ) { if ( log.isWarnEnabled() ) log.warn("All BIO server replication threads are busy, unable to handle more requests until a thread is freed up."); } - BioReplicationTask thread = (BioReplicationTask)getPool().getWorker(); - if ( thread == null ) continue; //should never happen + BioReplicationTask task = (BioReplicationTask)getTaskPool().getRxTask(); + if ( task == null ) continue; //should never happen try { socket = serverSocket.accept(); }catch ( Exception x ) { if ( doListen() ) throw x; } if ( !doListen() ) { - thread.setDoRun(false); - thread.serviceSocket(null,null); + task.setDoRun(false); + task.serviceSocket(null,null); + getExecutor().execute(task); break; //regular shutdown } if ( socket == null ) continue; @@ -152,7 +155,7 @@ socket.setTrafficClass(getSoTrafficClass()); socket.setSoTimeout(getTimeout()); ObjectReader reader = new ObjectReader(socket); - thread.serviceSocket(socket,reader); + task.serviceSocket(socket,reader); }//while } Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java?view=diff&rev=487420&r1=487419&r2=487420 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/bio/BioReplicationTask.java Thu Dec 14 17:35:06 2006 @@ -59,32 +59,19 @@ // loop forever waiting for work to do public synchronized void run() { - this.notify(); - while (isDoRun()) { - try { - // sleep and release object lock - this.wait(); - } catch (InterruptedException e) { - if(log.isInfoEnabled()) - log.info("TCP worker thread interrupted in cluster",e); - // clear interrupt status - Thread.interrupted(); - } - if ( socket == null ) continue; - try { - drainSocket(); - } catch ( Exception x ) { - log.error("Unable to service bio socket"); - }finally { - try {socket.close();}catch ( Exception ignore){} - try {reader.close();}catch ( Exception ignore){} - reader = null; - socket = null; - } - // done, ready for more, return to pool - if ( getPool() != null ) getPool().returnWorker (this); - else setDoRun(false); + if ( socket == null ) return; + try { + drainSocket(); + } catch ( Exception x ) { + log.error("Unable to service bio socket"); + }finally { + try {socket.close();}catch ( Exception ignore){} + try {reader.close();}catch ( Exception ignore){} + reader = null; + socket = null; } + // done, ready for more, return to pool + if ( getTaskPool() != null ) getTaskPool().returnWorker (this); } Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java?view=diff&rev=487420&r1=487419&r2=487420 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReceiver.java Thu Dec 14 17:35:06 2006 @@ -80,6 +80,7 @@ public void stop() { this.stopListening(); + super.stop(); } /** @@ -88,8 +89,8 @@ * @see org.apache.catalina.tribes.ClusterReceiver#start() */ public void start() throws IOException { + super.start(); try { -// setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this)); setPool(new RxTaskPool(getMaxThreads(),getMinThreads(),this)); } catch (Exception x) { log.fatal("ThreadPool can initilzed. Listener not started", x); @@ -109,7 +110,7 @@ } } - public AbstractRxTask getWorkerThread() { + public AbstractRxTask createRxTask() { NioReplicationTask thread = new NioReplicationTask(this,this); thread.setUseBufferPool(this.getUseBufferPool()); thread.setRxBufSize(getRxBufSize()); @@ -142,7 +143,7 @@ events.add(event); } if ( log.isTraceEnabled() ) log.trace("Adding event to selector:"+event); - selector.wakeup(); + if ( isListening() && selector!=null ) selector.wakeup(); } } @@ -177,7 +178,9 @@ protected void socketTimeouts() { //timeout - Set keys = selector.keys(); + Selector tmpsel = selector; + Set keys = (isListening()&&tmpsel!=null)?tmpsel.keys():null; + if ( keys == null ) return; long now = System.currentTimeMillis(); for (Iterator iter = keys.iterator(); iter.hasNext(); ) { SelectionKey key = (SelectionKey) iter.next(); @@ -365,17 +368,18 @@ * will then de-register the channel on the next select call. */ protected void readDataFromSocket(SelectionKey key) throws Exception { - NioReplicationTask worker = (NioReplicationTask) getPool().getWorker(); - if (worker == null) { - // No threads available, do nothing, the selection + NioReplicationTask task = (NioReplicationTask) getTaskPool().getRxTask(); + if (task == null) { + // No threads/tasks available, do nothing, the selection // loop will keep calling this method until a // thread becomes available, the thread pool itself has a waiting mechanism // so we will not wait here. - if (log.isDebugEnabled()) - log.debug("No TcpReplicationThread available"); + if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available"); } else { // invoking this wakes up the worker thread then returns - worker.serviceChannel(key); + //add task to thread pool + task.serviceChannel(key); + getExecutor().execute(task); } } Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java?view=diff&rev=487420&r1=487419&r2=487420 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioReplicationTask.java Thu Dec 14 17:35:06 2006 @@ -63,60 +63,50 @@ // loop forever waiting for work to do public synchronized void run() { - this.notify(); if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER ) { buffer = ByteBuffer.allocateDirect(getRxBufSize()); }else { buffer = ByteBuffer.allocate (getRxBufSize()); } - while (isDoRun()) { - try { - // sleep and release object lock - this.wait(); - } catch (InterruptedException e) { - if(log.isInfoEnabled()) log.info("TCP worker thread interrupted in cluster",e); - // clear interrupt status - Thread.interrupted(); - } - if (key == null) { - continue; // just in case - } - if ( log.isTraceEnabled() ) - log.trace("Servicing key:"+key); - try { - ObjectReader reader = (ObjectReader)key.attachment(); - if ( reader == null ) { - if ( log.isTraceEnabled() ) - log.trace("No object reader, cancelling:"+key); - cancelKey(key); - } else { - if ( log.isTraceEnabled() ) - log.trace("Draining channel:"+key); + if (key == null) { + return; // just in case + } + if ( log.isTraceEnabled() ) + log.trace("Servicing key:"+key); - drainChannel(key, reader); - } - } catch (Exception e) { - //this is common, since the sockets on the other - //end expire after a certain time. - if ( e instanceof CancelledKeyException ) { - //do nothing - } else if ( e instanceof IOException ) { - //dont spew out stack traces for IO exceptions unless debug is enabled. - if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e); - else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"]."); - } else if ( log.isErrorEnabled() ) { - //this is a real error, log it. - log.error("Exception caught in TcpReplicationThread.drainChannel.",e); - } + try { + ObjectReader reader = (ObjectReader)key.attachment(); + if ( reader == null ) { + if ( log.isTraceEnabled() ) + log.trace("No object reader, cancelling:"+key); cancelKey(key); - } finally { - + } else { + if ( log.isTraceEnabled() ) + log.trace("Draining channel:"+key); + + drainChannel(key, reader); } - key = null; - // done, ready for more, return to pool - getPool().returnWorker (this); + } catch (Exception e) { + //this is common, since the sockets on the other + //end expire after a certain time. + if ( e instanceof CancelledKeyException ) { + //do nothing + } else if ( e instanceof IOException ) { + //dont spew out stack traces for IO exceptions unless debug is enabled. + if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e); + else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"]."); + } else if ( log.isErrorEnabled() ) { + //this is a real error, log it. + log.error("Exception caught in TcpReplicationThread.drainChannel.",e); + } + cancelKey(key); + } finally { + } + key = null; + // done, ready for more, return to pool + getTaskPool().returnWorker (this); } /** @@ -131,14 +121,12 @@ * worker thread is servicing it. */ public synchronized void serviceChannel (SelectionKey key) { - if ( log.isTraceEnabled() ) - log.trace("About to service key:"+key); + if ( log.isTraceEnabled() ) log.trace("About to service key:"+key); ObjectReader reader = (ObjectReader)key.attachment(); if ( reader != null ) reader.setLastAccess(System.currentTimeMillis()); this.key = key; key.interestOps (key.interestOps() & (~SelectionKey.OP_READ)); key.interestOps (key.interestOps() & (~SelectionKey.OP_WRITE)); - this.notify(); // awaken the thread } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]