Author: cziegeler Date: Mon Jan 5 04:50:12 2009 New Revision: 731544 URL: http://svn.apache.org/viewvc?rev=731544&view=rev Log: SLING-635: Remove a thread after two clean up cycles by marking it in the first cycle to be removed and remove it in the second cycle - if the thread hasn't been used in the meantime.
Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=731544&r1=731543&r2=731544&view=diff ============================================================================== --- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original) +++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Mon Jan 5 04:50:12 2009 @@ -283,6 +283,31 @@ } } } + // check for idle threads + synchronized ( this.jobQueues ) { + final Iterator<Map.Entry<String, JobBlockingQueue>> i = this.jobQueues.entrySet().iterator(); + while ( i.hasNext() ) { + final Map.Entry<String, JobBlockingQueue> current = i.next(); + final JobBlockingQueue jbq = current.getValue(); + if ( jbq.size() == 0 ) { + if ( jbq.isMarkedForCleanUp() ) { + // set to finished + jbq.setFinished(true); + // wake up + try { + jbq.put(new EventInfo()); + } catch (InterruptedException e) { + this.ignoreException(e); + } + // remove + i.remove(); + } else { + // mark to be removed during next cycle + jbq.markForCleanUp(); + } + } + } + } } } @@ -481,7 +506,7 @@ */ private void runJobQueue(final String queueName, final JobBlockingQueue jobQueue) { EventInfo info = null; - while ( this.running ) { + while ( this.running && !jobQueue.isFinished() ) { if ( info == null ) { // so let's wait/get the next job from the queue try { @@ -492,7 +517,7 @@ } } - if ( info != null && this.running ) { + if ( info != null && this.running && !jobQueue.isFinished() ) { synchronized ( jobQueue.getLock()) { final EventInfo processInfo = info; info = null; @@ -1394,8 +1419,13 @@ private boolean isWaiting = false; + private boolean markForCleanUp = false; + + private boolean finished = false; + public EventInfo waitForFinish() throws InterruptedException { this.isWaiting = true; + this.markForCleanUp = false; this.lock.wait(); this.isWaiting = false; final EventInfo object = this.eventInfo; @@ -1403,6 +1433,16 @@ return object; } + public void markForCleanUp() { + if ( !this.isWaiting ) { + this.markForCleanUp = true; + } + } + + public boolean isMarkedForCleanUp() { + return !this.isWaiting && this.markForCleanUp; + } + public void notifyFinish(EventInfo i) { this.eventInfo = i; this.lock.notify(); @@ -1415,6 +1455,14 @@ public boolean isWaiting() { return this.isWaiting; } + + public boolean isFinished() { + return finished; + } + + public void setFinished(boolean flag) { + this.finished = flag; + } } private static final class StartedJobInfo {