Author: cziegeler Date: Fri Aug 29 01:29:23 2008 New Revision: 690147 URL: http://svn.apache.org/viewvc?rev=690147&view=rev Log: Fix handling of own queues: use provided name and use proper synchronisation.
Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Modified: incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java URL: http://svn.apache.org/viewvc/incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java?rev=690147&r1=690146&r2=690147&view=diff ============================================================================== --- incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java (original) +++ incubator/sling/trunk/extensions/event/src/main/java/org/apache/sling/event/impl/JobEventHandler.java Fri Aug 29 01:29:23 2008 @@ -355,7 +355,7 @@ // check if we should put this into a separate queue if ( info != null && info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME) != null ) { - final String queueName = EventUtil.PROPERTY_JOB_QUEUE_NAME; + final String queueName = (String)info.event.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME); synchronized ( this.jobQueues ) { BlockingQueue<EventInfo> jobQueue = this.jobQueues.get(queueName); if ( jobQueue == null ) { @@ -415,14 +415,15 @@ final EventInfo processInfo = info; info = null; if ( this.executeJob(processInfo, jobQueue) ) { + EventInfo newInfo = null; try { - jobQueue.wait(); + newInfo = jobQueue.waitForFinish(); } catch (InterruptedException e) { this.ignoreException(e); } // if we have an info, this is a reschedule - final EventInfo newInfo = jobQueue.get(); if ( newInfo != null ) { + final EventInfo newEventInfo = newInfo; final Event job = newInfo.event; // is this an ordered queue? @@ -450,7 +451,7 @@ final Runnable t = new Runnable() { public void run() { try { - jobQueue.put(newInfo); + jobQueue.put(newEventInfo); } catch (InterruptedException e) { // this should never happen ignoreException(e); @@ -1049,8 +1050,7 @@ synchronized ( this.jobQueues ) { jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME)); } - jobQueue.set(info); - jobQueue.notify(); + jobQueue.notifyFinish(info); } else { // delay rescheduling? @@ -1095,8 +1095,7 @@ synchronized ( this.jobQueues ) { jobQueue = this.jobQueues.get(job.getProperty(EventUtil.PROPERTY_JOB_QUEUE_NAME)); } - jobQueue.set(null); - jobQueue.notify(); + jobQueue.notifyFinish(null); } } if ( !shouldReschedule ) { @@ -1274,14 +1273,22 @@ private EventInfo eventInfo; - public void set(EventInfo i) { - this.eventInfo = i; - } + private final Object lock = new Object(); - public EventInfo get() { + public EventInfo waitForFinish() throws InterruptedException { + synchronized ( this.lock ) { + this.lock.wait(); + } final EventInfo object = this.eventInfo; this.eventInfo = null; return object; } + + public void notifyFinish(EventInfo i) { + this.eventInfo = i; + synchronized ( this.lock ) { + this.lock.notify(); + } + } } }