Author: djencks Date: Wed Mar 30 23:27:40 2005 New Revision: 159568 URL: http://svn.apache.org/viewcvs?view=rev&rev=159568 Log: check the same tx came back as we started with and clear thread when done
Modified: geronimo/trunk/modules/connector/src/java/org/apache/geronimo/connector/work/WorkerContext.java Modified: geronimo/trunk/modules/connector/src/java/org/apache/geronimo/connector/work/WorkerContext.java URL: http://svn.apache.org/viewcvs/geronimo/trunk/modules/connector/src/java/org/apache/geronimo/connector/work/WorkerContext.java?view=diff&r1=159567&r2=159568 ============================================================================== --- geronimo/trunk/modules/connector/src/java/org/apache/geronimo/connector/work/WorkerContext.java (original) +++ geronimo/trunk/modules/connector/src/java/org/apache/geronimo/connector/work/WorkerContext.java Wed Mar 30 23:27:40 2005 @@ -34,6 +34,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.geronimo.transaction.context.TransactionContextManager; +import org.apache.geronimo.transaction.context.TransactionContext; import org.apache.geronimo.transaction.ImportedTransactionActiveException; /** @@ -50,8 +51,8 @@ */ private static final WorkListener NULL_WORK_LISTENER = new WorkAdapter() { public void workRejected(WorkEvent event) { - if(event.getException() != null) { - if(event.getException() instanceof WorkCompletedException && event.getException().getCause() != null) { + if (event.getException() != null) { + if (event.getException() instanceof WorkCompletedException && event.getException().getCause() != null) { log.error(event.getWork().toString(), event.getException().getCause()); } else { log.error(event.getWork().toString(), event.getException()); @@ -121,7 +122,7 @@ /** * Create a WorkWrapper. * - * @param work Work to be wrapped. + * @param work Work to be wrapped. * @param transactionContextManager */ public WorkerContext(Work work, TransactionContextManager transactionContextManager) { @@ -133,13 +134,13 @@ /** * Create a WorkWrapper with the specified execution context. * - * @param aWork Work to be wrapped. + * @param aWork Work to be wrapped. * @param aStartTimeout a time duration (in milliseconds) within which the - * execution of the Work instance must start. - * @param execContext an object containing the execution context with which - * the submitted Work instance must be executed. - * @param workListener an object which would be notified when the various - * Work processing events (work accepted, work rejected, work started, + * execution of the Work instance must start. + * @param execContext an object containing the execution context with which + * the submitted Work instance must be executed. + * @param workListener an object which would be notified when the various + * Work processing events (work accepted, work rejected, work started, */ public WorkerContext(Work aWork, long aStartTimeout, ExecutionContext execContext, @@ -167,7 +168,7 @@ * given resource adapter. * * @param aPriority Priority of the thread to be used to process the wrapped - * Work instance. + * Work instance. */ public void setThreadPriority(int aPriority) { threadPriority = aPriority; @@ -179,7 +180,7 @@ * given resource adapter. * * @return The priority level of the thread to be dispatched to - * process the wrapped Work instance. + * process the wrapped Work instance. */ public int getThreadPriority() { return threadPriority; @@ -190,7 +191,7 @@ * instance that the wrapped Work instance has been accepted. * * @param anObject Object on which the event initially occurred. It should - * be the work executor. + * be the work executor. */ public synchronized void workAccepted(Object anObject) { isAccepted = true; @@ -236,8 +237,7 @@ boolean isTimeout = acceptedTime + startTimeOut > 0 && System.currentTimeMillis() > acceptedTime + startTimeOut; if (log.isDebugEnabled()) { - log.debug( - this + log.debug(this + " accepted at " + acceptedTime + (isTimeout ? " has timed out." : " has not timed out. ") @@ -247,12 +247,10 @@ if (isTimeout) { workException = new WorkRejectedException(this + " has timed out.", WorkException.START_TIMED_OUT); - workListener.workRejected( - new WorkEvent( - this, - WorkEvent.WORK_REJECTED, - adaptee, - workException)); + workListener.workRejected(new WorkEvent(this, + WorkEvent.WORK_REJECTED, + adaptee, + workException)); return true; } nbRetry++; @@ -282,26 +280,33 @@ // Implementation note: the work listener is notified prior to release // the start lock. This behavior is intentional and seems to be the // more conservative. - workListener.workStarted( - new WorkEvent(this, WorkEvent.WORK_STARTED, adaptee, null)); + workListener.workStarted(new WorkEvent(this, WorkEvent.WORK_STARTED, adaptee, null)); startLatch.release(); //Implementation note: we assume this is being called without an interesting TransactionContext, //and ignore/replace whatever is associated with the current thread. try { if (executionContext == null || executionContext.getXid() == null) { - transactionContextManager.newUnspecifiedTransactionContext(); - adaptee.run(); + TransactionContext context = transactionContextManager.newUnspecifiedTransactionContext(); + try { + adaptee.run(); + } finally { + TransactionContext returningContext = transactionContextManager.getContext(); + transactionContextManager.setContext(null); + if (context != returningContext) { + throw new WorkCompletedException("Wrong TransactionContext on return from work done"); + } + } //TODO should we commit the txContext to flush any leftover state??? } else { try { long transactionTimeout = executionContext.getTransactionTimeout(); //translate -1 value to 0 to indicate default transaction timeout. - transactionContextManager.begin(executionContext.getXid(), transactionTimeout == -1? 0: transactionTimeout); + transactionContextManager.begin(executionContext.getXid(), transactionTimeout == -1 ? 0 : transactionTimeout); } catch (XAException e) { throw new WorkCompletedException("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e); - } catch (InvalidTransactionException e) { + } catch (InvalidTransactionException e) { throw new WorkCompletedException("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e); - } catch (SystemException e) { + } catch (SystemException e) { throw new WorkCompletedException("Transaction import failed for xid " + executionContext.getXid(), WorkCompletedException.TX_RECREATE_FAILED).initCause(e); } catch (ImportedTransactionActiveException e) { throw new WorkCompletedException("Transaction already active for xid " + executionContext.getXid(), WorkCompletedException.TX_CONCURRENT_WORK_DISALLOWED); @@ -313,13 +318,11 @@ } } - workListener.workCompleted( - new WorkEvent(this, WorkEvent.WORK_COMPLETED, adaptee, null)); + workListener.workCompleted(new WorkEvent(this, WorkEvent.WORK_COMPLETED, adaptee, null)); } catch (Throwable e) { - workException = (WorkException) (e instanceof WorkCompletedException? e: new WorkCompletedException("Unknown error", WorkCompletedException.UNDEFINED).initCause(e)); - workListener.workCompleted( - new WorkEvent(this, WorkEvent.WORK_REJECTED, adaptee, - workException)); + workException = (WorkException) (e instanceof WorkCompletedException ? e : new WorkCompletedException("Unknown error", WorkCompletedException.UNDEFINED).initCause(e)); + workListener.workCompleted(new WorkEvent(this, WorkEvent.WORK_REJECTED, adaptee, + workException)); } finally { endLatch.release(); } @@ -330,7 +333,7 @@ * execution. * * @return Latch that a caller can acquire to wait for the start of a - * work execution. + * work execution. */ public synchronized Latch provideStartLatch() { return startLatch; @@ -341,7 +344,7 @@ * execution. * * @return Latch that a caller can acquire to wait for the end of a - * work execution. + * work execution. */ public synchronized Latch provideEndLatch() { return endLatch;