Author: cwiklik Date: Fri Jan 20 16:13:41 2012 New Revision: 1233979 URL: http://svn.apache.org/viewvc?rev=1233979&view=rev Log: UIMA-2354 blocks receiving thread when invokeProcess() returns, to prevent it from getting another CAS while the previous CAS is still in-play
Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java Modified: uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java URL: http://svn.apache.org/viewvc/uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java?rev=1233979&r1=1233978&r2=1233979&view=diff ============================================================================== --- uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java (original) +++ uima/uima-as/trunk/uimaj-as-core/src/main/java/org/apache/uima/aae/handler/input/ProcessRequestHandler_impl.java Fri Jan 20 16:13:41 2012 @@ -20,6 +20,7 @@ package org.apache.uima.aae.handler.input; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; import org.apache.uima.UIMAFramework; import org.apache.uima.aae.SerializerCache; @@ -52,6 +53,13 @@ import org.apache.uima.util.Level; public class ProcessRequestHandler_impl extends HandlerBase { private static final Class CLASS_NAME = ProcessRequestHandler_impl.class; + /* + * Declare a semaphore which is used to block UIMA AS aggregate receiving thread until + * a CAS is fully processed. This semaphore prevents the receiving thread from grabbing + * another CAS from an input queue while a CAS it received previously is still + * in-play. Fixes load balancing across multiple UIMA AS aggregate processes. + */ + final ThreadLocal<Semaphore> threadCompletionMonitor = new ThreadLocal<Semaphore>(); private Object mux = new Object(); @@ -225,7 +233,26 @@ public class ProcessRequestHandler_impl // deserSharedData, casReferenceId); entry = getController().getInProcessCache().register(cas, aMessageContext, deserSharedData, casReferenceId, marker, acceptsDeltaCas); - + + /* + * In UIMA AS Aggregate the receiving thread must be blocked until a CAS is fully + * processed. This is to prevent the receiving thread from grabbing another CAS + * breaking prefetch throttling. The receiving thread takes a CAS from service queue, + * deserializes CAS, asks the FC for the next step and enqueues the CAS + * onto delegate's queue. Once the enqueue completes, the thread is done + * and ready to get more CASes from the service queue. The receiving must + * therefor be blocked right after it enqueues the CAS on delegates queue. + * To that end, while handling a new CAS, create a shared semaphore and + * associate it with a current thread as ThreadLocal variable. Also, associate the + * same semaphore with a CAS so that when the CAS is sent back to the client the + * the receiving thread is unblocked. + */ + if ( !getController().isPrimitive() ) { + Semaphore semaphore = new Semaphore(0); + // threadCompletionMonitor is a ThreadLocal var + threadCompletionMonitor.set(semaphore); + entry.setThreadCompletionSemaphore(semaphore); + } long timeToDeserializeCAS = getController().getCpuTime() - t1; getController().incrementDeserializationTime(timeToDeserializeCAS); LongNumericStatistic statistic; @@ -482,6 +509,26 @@ public class ProcessRequestHandler_impl // ***************************************************************** invokeProcess(entry.getCas(), inputCasReferenceId, casReferenceId, aMessageContext, newCASProducedBy); + + /** + * Below comments apply to UIMA AS aggregate only. + * CAS has been handed off to a delegate. Now block the receiving thread until + * the CAS is processed or there is a timeout or error. Fetch this thread's ThreadLocal + * semaphore to block the thread. It will be unblocked when the aggregate is done with + * the CAS. + */ + if (!getController().isPrimitive() ) { + Semaphore completionSemaphore = threadCompletionMonitor.get(); + try { + // Block until the CAS is fully processed or there is an error + completionSemaphore.acquire(); + } catch( InterruptedException ex) { + } finally { + // remove ThreadLocal semaphore + threadCompletionMonitor.remove(); + } + } + } else { if (UIMAFramework.getLogger(CLASS_NAME).isLoggable(Level.INFO)) { UIMAFramework.getLogger(CLASS_NAME).logrb(Level.INFO, CLASS_NAME.getName(),