Author: gatfora
Date: Tue Nov 6 01:05:28 2007
New Revision: 592342
URL: http://svn.apache.org/viewvc?rev=592342&view=rev
Log:
As described in SANDESHA2-108 this change avoids the Inorder thread switch if
the message being received is the next message to be invoked. This change
requires Axis2 revision 592132
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
Tue Nov 6 01:05:28 2007
@@ -942,16 +942,8 @@
StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configContext,
configContext.getAxisConfiguration());
reportTransaction = storageManager.getTransaction();
- //only do this if we are running inOrder
-
if(SandeshaUtil.getPropertyBean(configContext.getAxisConfiguration()).isInOrder()){
- Invoker invoker =
(Invoker)SandeshaUtil.getSandeshaStorageManager(configContext,
configContext.getAxisConfiguration()).getInvoker();
- if (invoker==null){
- throw new
SandeshaException(SandeshaMessageHelper.getMessage(
-
SandeshaMessageKeys.invokerNotFound, sequenceID));
- }
-
-
invoker.forceInvokeOfAllMessagesCurrentlyOnSequence(configContext, sequenceID,
allowLaterDeliveryOfMissingMessages);
- }
+ // There will only be messages waiting if we are
running in-order
+
Invoker.forceInvokeOfAllMessagesCurrentlyOnSequence(configContext, sequenceID,
allowLaterDeliveryOfMissingMessages);
if(reportTransaction != null &&
reportTransaction.isActive()) reportTransaction.commit();
reportTransaction = null;
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/handlers/SandeshaGlobalInHandler.java
Tue Nov 6 01:05:28 2007
@@ -220,9 +220,7 @@
boolean isDuplicate = true;
//still allow this msg if we have no corresponding invoker bean
for it and we are inOrder
- boolean isInOrder =
-
SandeshaUtil.getDefaultPropertyBean(rmMsgCtx.getConfigurationContext().getAxisConfiguration()).isInOrder();
- if(isInOrder)
+ if(SandeshaUtil.isInOrder(rmMsgCtx.getMessageContext()))
{
InvokerBean finderBean = new InvokerBean();
finderBean.setMsgNo(msgNo);
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
Tue Nov 6 01:05:28 2007
@@ -210,7 +210,6 @@
public final static String elementMustForSpec = "elementMustForSpec";
public final static String couldNotSendCreateSeqResponse =
"couldNotSendCreateSeqResponse";
public final static String invalidElementFoundWithinElement =
"invalidElementFoundWithinElement";
- public final static String invokerNotFound="invokerNotFound";
public final static String
couldNotSendCloseResponse="couldNotSendCloseResponse";
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
Tue Nov 6 01:05:28 2007
@@ -46,7 +46,6 @@
import org.apache.sandesha2.security.SecurityToken;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
-import org.apache.sandesha2.storage.beanmanagers.InvokerBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMDBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.RMSBeanMgr;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
@@ -59,7 +58,7 @@
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.util.TerminateManager;
-import org.apache.sandesha2.workers.SandeshaThread;
+import org.apache.sandesha2.workers.InvokerWorker;
import org.apache.sandesha2.wsrm.Sequence;
/**
@@ -135,7 +134,7 @@
}
// setting acked msg no range
- ConfigurationContext configCtx =
rmMsgCtx.getMessageContext().getConfigurationContext();
+ ConfigurationContext configCtx =
msgCtx.getConfigurationContext();
if (configCtx == null) {
String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.configContextNotSet);
log.debug(message);
@@ -202,9 +201,9 @@
}
String specVersion = rmMsgCtx.getRMSpecVersion();
- if
((SandeshaUtil.isDuplicateInOnlyMessage(rmMsgCtx.getMessageContext())
+ if ((SandeshaUtil.isDuplicateInOnlyMessage(msgCtx)
||
-
SandeshaUtil.isDuplicateInOutMessage(rmMsgCtx.getMessageContext()))
+
SandeshaUtil.isDuplicateInOutMessage(msgCtx))
&&
(Sandesha2Constants.QOS.InvocationType.DEFAULT_INVOCATION_TYPE ==
Sandesha2Constants.QOS.InvocationType.EXACTLY_ONCE)) {
// this is a duplicate message and the invocation type
is EXACTLY_ONCE. We try to return
@@ -353,11 +352,11 @@
// If the MEP doesn't need the backchannel, and nor do
we, we should signal it so that it
// can close off as soon as possible.
if (backchannelFree) {
+ TransportUtils.setResponseWritten(msgCtx,
false);
+
RequestResponseTransport t = null;
t = (RequestResponseTransport)
rmMsgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
-
if(t != null &&
RequestResponseTransportStatus.WAITING.equals(t.getStatus())) {
-
TransportUtils.setResponseWritten(msgCtx, false);
t.acknowledgeMessage(msgCtx);
}
}
@@ -377,33 +376,40 @@
}
}
- // If the storage manager has an invoker, then they may be
implementing inOrder, or
- // transactional delivery. Either way, if they have one we
should use it.
- SandeshaThread invoker = storageManager.getInvoker();
- if (invoker != null) {
- // Whatever the MEP, we stop processing here and the
invoker will do the real work. We only
- // SUSPEND if we need to keep the backchannel open for
the response... we may as well ABORT
- // to let other cases end more quickly.
- if(backchannelFree && ackBackChannel) {
- result = InvocationResponse.ABORT;
- } else {
- result = InvocationResponse.SUSPEND;
- }
+ // If the storage manager is implementing inOrder, or using
transactional delivery
+ // then we should hand the message over to the invoker thread.
If not, we can invoke
+ // it directly ourselves.
+ InvokerWorker worker = null;
+ if (SandeshaUtil.isInOrder(msgCtx) ||
storageManager.hasUserTransaction(msgCtx)) {
- InvokerBeanMgr storageMapMgr =
storageManager.getInvokerBeanMgr();
-
InvokerBean invokerBean = new InvokerBean(key, msgNo,
sequenceId);
-
ContextManager contextMgr =
SandeshaUtil.getContextManager(configCtx);
+
if(contextMgr != null)
invokerBean.setContext(contextMgr.storeContext());
- boolean wasAdded = storageMapMgr.insert(invokerBean);
+ boolean wasAdded =
storageManager.getInvokerBeanMgr().insert(invokerBean);
// This will avoid performing application processing
more than once.
rmMsgCtx.setProperty(Sandesha2Constants.APPLICATION_PROCESSING_DONE, "true");
+ // Whatever the MEP, we stop processing here and the
invoker will do the real work. As we
+ // are taking responsibility for the message we need to
return SUSPEND
+ result = InvocationResponse.SUSPEND;
+
if (wasAdded) {
- storageManager.storeMessageContext(key,
rmMsgCtx.getMessageContext());
+ storageManager.storeMessageContext(key, msgCtx);
+ // We can invoke the message immediately, if
this is the next message to invoke,
+ // and we don't have a user transaction in play.
+ if(bean.getNextMsgNoToProcess() == msgNo &&
!storageManager.hasUserTransaction(msgCtx)) {
+ String workId = sequenceId;
+ ConfigurationContext context =
msgCtx.getConfigurationContext();
+
+ worker = new InvokerWorker(context,
invokerBean);
+ worker.setWorkId(workId);
+
+ // Actually take the lock
+ worker.getLock().addWork(workId,
worker);
+ }
} else {
// Abort this message immediately as this
message has already been added
sendAck = false;
@@ -422,6 +428,14 @@
if (transaction != null && transaction.isActive())
transaction.commit();
+ if(worker != null) {
+ try {
+ worker.run();
+ } catch(Exception e) {
+ log.error("Caught exception running
InvokerWorker", e);
+ }
+ }
+
if (sendAck) {
try {
transaction = storageManager.getTransaction();
@@ -429,6 +443,15 @@
RMMsgContext ackRMMsgContext =
AcknowledgementManager.generateAckMessage(rmMsgCtx, bean, sequenceId,
storageManager,true);
AcknowledgementManager.sendAckNow(ackRMMsgContext);
TransportUtils.setResponseWritten(msgCtx, true);
+ RequestResponseTransport t =
+ (RequestResponseTransport)
rmMsgCtx.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+
+ // Tell the transport that we have finished
with the message as the response should have been
+ // written
+ if(t != null &&
RequestResponseTransportStatus.WAITING.equals(t.getStatus())) {
+ t.signalResponseReady();
+ }
+
if (transaction != null &&
transaction.isActive()) transaction.commit();
transaction = null;
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryInvokerBeanMgr.java
Tue Nov 6 01:05:28 2007
@@ -33,9 +33,7 @@
}
public boolean insert(InvokerBean bean) throws SandeshaStorageException
{
- boolean result = super.insert(bean.getMessageContextRefKey(),
bean);
- mgr.getInMemoryTransaction().setReceivedMessages(true);
- return result;
+ return super.insert(bean.getMessageContextRefKey(), bean);
}
public boolean delete(String key) throws SandeshaStorageException {
@@ -51,9 +49,7 @@
}
public boolean update(InvokerBean bean) throws SandeshaStorageException
{
- boolean result = super.update(bean.getMessageContextRefKey(),
bean);
- mgr.getInMemoryTransaction().setReceivedMessages(true);
- return result;
+ return super.update(bean.getMessageContextRefKey(), bean);
}
public InvokerBean findUnique(InvokerBean bean) throws
SandeshaException {
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
Tue Nov 6 01:05:28 2007
@@ -48,7 +48,6 @@
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMBean;
import org.apache.sandesha2.util.SandeshaUtil;
-import org.apache.sandesha2.workers.Invoker;
import org.apache.sandesha2.workers.SandeshaThread;
import org.apache.sandesha2.workers.Sender;
@@ -62,7 +61,6 @@
private SenderBeanMgr senderBeanMgr = null;
private InvokerBeanMgr invokerBeanMgr = null;
private Sender sender = null;
- private Invoker invoker = null;
private PollingManager pollingManager = null;
private HashMap transactions = new HashMap();
private boolean useSerialization = false;
@@ -76,10 +74,6 @@
SandeshaPolicyBean policy =
SandeshaUtil.getPropertyBean(context.getAxisConfiguration());
useSerialization = policy.isUseMessageSerialization();
- // Note that while inOrder is a global property we can decide
if we need the
- // invoker thread at this point. If we change this to be a
sequence-level
- // property then we'll need to revisit this.
- boolean inOrder = policy.isInOrder();
boolean polling = policy.isEnableMakeConnection();
this.rMSBeanMgr = new InMemoryRMSBeanMgr (this, context);
@@ -87,7 +81,6 @@
this.senderBeanMgr = new InMemorySenderBeanMgr (this, context);
this.invokerBeanMgr = new InMemoryInvokerBeanMgr (this,
context);
this.sender = new Sender();
- if(inOrder) this.invoker = new Invoker();
if(polling) this.pollingManager = new PollingManager();
}
@@ -135,7 +128,7 @@
* Gets the Invoker for this Storage manager
*/
public SandeshaThread getInvoker() {
- return invoker;
+ return null;
}
/**
@@ -364,6 +357,7 @@
SOAPEnvelope envelope;
}
}
+
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryTransaction.java
Tue Nov 6 01:05:28 2007
@@ -28,7 +28,6 @@
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beans.RMBean;
-import org.apache.sandesha2.workers.SandeshaThread;
/**
* This class does not really implement transactions, but it is a good
@@ -45,7 +44,6 @@
private ArrayList enlistedBeans = new ArrayList();
private InMemoryTransaction waitingForTran = null;
private boolean sentMessages = false;
- private boolean receivedMessages = false;
private boolean active = true;
InMemoryTransaction(InMemoryStorageManager manager, String threadName,
int id) {
@@ -59,10 +57,6 @@
public void commit() {
releaseLocks();
if(sentMessages) manager.getSender().wakeThread();
- if(receivedMessages) {
- SandeshaThread invoker = manager.getInvoker();
- if(invoker != null) invoker.wakeThread();
- }
active = false;
}
@@ -164,14 +158,11 @@
return result.toString();
}
- public void setReceivedMessages(boolean receivedMessages) {
- this.receivedMessages = receivedMessages;
- }
-
public void setSentMessages(boolean sentMessages) {
this.sentMessages = sentMessages;
}
}
+
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
Tue Nov 6 01:05:28 2007
@@ -1169,5 +1169,14 @@
return epr;
}
+ public static boolean isInOrder(MessageContext context) throws
SandeshaException {
+ if (log.isDebugEnabled()) log.debug("Enter:
SandeshaUtil::isInOrder");
+
+ SandeshaPolicyBean policy =
getPropertyBean(context.getAxisOperation());
+ boolean result = policy.isInOrder();
+
+ if (log.isDebugEnabled()) log.debug("Enter:
SandeshaUtil::isInOrder, " + result);
+ return result;
+ }
}
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
Tue Nov 6 01:05:28 2007
@@ -68,12 +68,13 @@
* Otherwise messages skipped over will be ignored
* @throws SandeshaException
*/
- public synchronized void
forceInvokeOfAllMessagesCurrentlyOnSequence(ConfigurationContext ctx,
+ public static synchronized void
forceInvokeOfAllMessagesCurrentlyOnSequence(ConfigurationContext ctx,
String sequenceID,
boolean allowLaterDeliveryOfMissingMessages)throws
SandeshaException{
- //first we block while we wait for the invoking thread to pause
- blockForPause();
+// //first we block while we wait for the invoking thread to pause
+// blockForPause();
try{
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(ctx, ctx.getAxisConfiguration());
//get all invoker beans for the sequence
InvokerBeanMgr storageMapMgr = storageManager
.getInvokerBeanMgr();
@@ -102,33 +103,23 @@
InvokerBean invoker =
(InvokerBean)stMapIt.next();
// start a new worker thread
and let it do the invocation.
- String workId = sequenceID +
"::" + invoker.getMsgNo(); //creating a workId to uniquely identify the
- //piece of work that will be
assigned to the Worker.
+ String workId = sequenceID;
- String messageContextKey =
invoker.getMessageContextRefKey();
- InvokerWorker worker = new
InvokerWorker(context,
-
messageContextKey,
- true); //want
to ignore the enxt msg number
-
- worker.setLock(getWorkerLock());
+ InvokerWorker worker = new
InvokerWorker(ctx, invoker);
+ worker.forceOutOfOrder();
+ worker.setPooled();
worker.setWorkId(workId);
// Wrap the invoker worker with
the correct context, if needed.
Runnable work = worker;
- ContextManager contextMgr =
SandeshaUtil.getContextManager(context);
+ ContextManager contextMgr =
SandeshaUtil.getContextManager(ctx);
if(contextMgr != null) {
work =
contextMgr.wrapWithContext(work, invoker.getContext());
}
- try {
- // Try and set the lock
up before we start the thread, but roll it back
- // if we hit any
problems
-
if(worker.getLock().addWork(workId, worker)){
-
threadPool.execute(work);
- }
- } catch(Exception e) {
-
worker.getLock().removeWork(workId);
- }
+ // Setup the lock for the new
worker
+
worker.getLock().addWork(workId, worker);
+
ctx.getThreadPool().execute(work);
long msgNumber =
invoker.getMsgNo();
//if necessary, update the
"next message number" bean under this transaction
@@ -176,8 +167,8 @@
}
}
finally{
- //restart the invoker
- finishPause();
+// //restart the invoker
+// finishPause();
}
}
@@ -311,9 +302,7 @@
//see if this is an out of order msg
boolean beanIsOutOfOrderMsg =
bean.getMsgNo()!=nextMsgno;
- String workId = sequenceId + "::" +
bean.getMsgNo();
-
//creating a workId to
uniquely identify the
-
//piece of work that will be
assigned to the Worker.
+ String workId = sequenceId;
//check whether the bean is already assigned to
a worker.
if (getWorkerLock().isWorkPresent(workId)) {
@@ -331,20 +320,15 @@
return sleep;
}
- String messageContextKey =
bean.getMessageContextRefKey();
-
if(transaction != null) {
transaction.commit();
transaction = null;
}
// start a new worker thread and let it do the
invocation.
- InvokerWorker worker = new
InvokerWorker(context,
- messageContextKey,
- beanIsOutOfOrderMsg); //only
ignore nextMsgNumber if the bean is an
-
//out of order message
-
- worker.setLock(getWorkerLock());
+ InvokerWorker worker = new
InvokerWorker(context, bean);
+ if(beanIsOutOfOrderMsg)
worker.forceOutOfOrder();
+ worker.setPooled();
worker.setWorkId(workId);
// Wrap the invoker worker with the correct
context, if needed.
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
Tue Nov 6 01:05:28 2007
@@ -17,6 +17,7 @@
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
+import org.apache.sandesha2.context.ContextManager;
import org.apache.sandesha2.storage.SandeshaStorageException;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
@@ -30,21 +31,161 @@
public class InvokerWorker extends SandeshaWorker implements Runnable {
- ConfigurationContext configurationContext = null;
- String messageContextKey;
- boolean ignoreNextMsg = false;
+ static final Log log = LogFactory.getLog(InvokerWorker.class);
+ static final WorkerLock lock = new WorkerLock();
- Log log = LogFactory.getLog(InvokerWorker.class);
+ private ConfigurationContext configurationContext;
+ private String sequence;
+ private long messageNumber;
+ private String messageContextKey;
+ private boolean ignoreNextMsg;
+ private boolean pooledThread;
- public InvokerWorker (ConfigurationContext configurationContext, String
messageContextKey, boolean ignoreNextMsg) {
+ public InvokerWorker (ConfigurationContext configurationContext,
InvokerBean bean) {
+ // All invoker workers need to use the same lock, so we point
to the static one here.
+ this.setLock(lock);
+
this.configurationContext = configurationContext;
- this.messageContextKey = messageContextKey;
- this.ignoreNextMsg = ignoreNextMsg;
+ initializeFromBean(bean);
}
+ public void forceOutOfOrder() {
+ if(log.isDebugEnabled()) log.debug("Enter:
InvokerWorker::forceOutOfOrder");
+ ignoreNextMsg = true;
+ if(log.isDebugEnabled()) log.debug("Exit:
InvokerWorker::forceOutOfOrder");
+ }
+
+ public void setPooled() {
+ if(log.isDebugEnabled()) log.debug("Enter:
InvokerWorker::setPooled");
+ pooledThread = true;
+ if(log.isDebugEnabled()) log.debug("Exit:
InvokerWorker::setPooled");
+ }
+
+ private void initializeFromBean(InvokerBean bean) {
+ if(log.isDebugEnabled()) log.debug("Enter:
InvokerWorker::initializeFromBean " + bean);
+
+ this.sequence = bean.getSequenceID();
+ this.messageNumber = bean.getMsgNo();
+ this.messageContextKey = bean.getMessageContextRefKey();
+
+ if(log.isDebugEnabled()) log.debug("Exit:
InvokerWorker::initializeFromBean");
+ }
+
+ /**
+ * The run method invokes the message that this invoker has been primed
with, but will
+ * also attempt to invoke subsequent messages. If the invoker worker is
running on the
+ * application thread then we move on to a thread pool for the second
message, but if
+ * we are already on a pooled thread then we just continue.
+ */
public void run() {
- if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run");
+ if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run,
message " + messageNumber + ", sequence " + sequence);
+
+ // If we are not the holder of the correct lock, then we have
to stop
+ if(lock != null && !lock.ownsLock(workId, this)) {
+ if (log.isDebugEnabled()) log.debug("Exit:
InvokerWorker::run, another worker holds the lock");
+ return;
+ }
+
+ Transaction tran = null;
+ try {
+ InvokerWorker nextWorker = null;
+ Runnable nextRunnable = null;
+
+ // Invoke the first message
+ invokeMessage(null);
+
+ // Look for the next message, so long as we are still
processing normally
+ while(!ignoreNextMsg) {
+ InvokerBean finder = new InvokerBean();
+ finder.setSequenceID(sequence);
+ finder.setMsgNo(messageNumber + 1);
+
+ StorageManager storageManager =
SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
+ tran = storageManager.getTransaction();
+
+ InvokerBeanMgr mgr =
storageManager.getInvokerBeanMgr();
+ InvokerBean nextBean = mgr.findUnique(finder);
+
+ if(nextBean != null) {
+ if(pooledThread) {
+ initializeFromBean(nextBean);
+ final Transaction theTran =
tran;
+ Runnable work = new Runnable() {
+ public void run() {
+
invokeMessage(theTran);
+ }
+ };
+
+ // Wrap the work with the
correct context, if needed.
+ ContextManager contextMgr =
SandeshaUtil.getContextManager(configurationContext);
+ if(contextMgr != null) {
+ work =
contextMgr.wrapWithContext(work, nextBean.getContext());
+ }
+
+ // Finally do the work
+ work.run();
+
+ tran = null;
+ } else {
+ nextWorker = new
InvokerWorker(configurationContext, nextBean);
+ nextWorker.setPooled();
+ nextWorker.setWorkId(workId);
+
+ // Wrap the invoker worker with
the correct context, if needed.
+ ContextManager contextMgr =
SandeshaUtil.getContextManager(configurationContext);
+ if(contextMgr != null) {
+ nextRunnable =
contextMgr.wrapWithContext(nextWorker, nextBean.getContext());
+ } else {
+ nextRunnable =
nextWorker;
+ }
+ }
+ }
+ // Clean up the tran, in case we didn't pass it
into the invoke method
+ if(tran != null) tran.commit();
+ tran = null;
+
+ if(nextBean == null || nextWorker != null) {
+ // We have run out of work, or the new
worker has taken it on, so we can
+ // break out of the loop
+ break;
+ }
+ }
+
+ if (workId !=null && lock!=null) {
+ lock.removeWork(workId);
+ }
+
+ // If we created another worker, set it running now
that we have released the lock
+ if(nextWorker != null) {
+ lock.addWork(workId, nextWorker);
+
configurationContext.getThreadPool().execute(nextRunnable);
+ }
+
+ } catch(SandeshaException e) {
+ log.debug("Exception within InvokerWorker", e);
+
+ // Clean up the tran, if there is one left
+ if(tran != null) {
+ try {
+ tran.rollback();
+ } catch(SandeshaException e2) {
+ log.debug("Exception rolling back
tran", e2);
+ }
+ }
+ } finally {
+ // Release the lock
+ if (workId !=null && lock!=null &&
lock.ownsLock(workId, this)) {
+ lock.removeWork(workId);
+ }
+ }
+
+ if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
+ }
+
+ private void invokeMessage(Transaction tran) {
+ if(log.isDebugEnabled()) log.debug("Enter:
InvokerWorker::invokeMessage");
+
Transaction transaction = null;
MessageContext msgToInvoke = null;
@@ -54,7 +195,11 @@
InvokerBeanMgr invokerBeanMgr =
storageManager.getInvokerBeanMgr();
//starting a transaction
- transaction = storageManager.getTransaction();
+ if(tran == null) {
+ transaction = storageManager.getTransaction();
+ } else {
+ transaction = tran;
+ }
InvokerBean invokerBean =
invokerBeanMgr.retrieve(messageContextKey);
@@ -153,7 +298,7 @@
TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(),
storageManager);
// exit from current iteration. (since
an entry
// was removed)
- if(log.isDebugEnabled())
log.debug("Exit: InvokerWorker::run Last message return");
+ if(log.isDebugEnabled())
log.debug("Exit: InvokerWorker::invokeMessage Last message return");
if(transaction != null &&
transaction.isActive()) transaction.commit();
return;
}
@@ -181,10 +326,6 @@
if (log.isErrorEnabled())
log.error(e.toString(), e);
} finally {
- if (workId !=null && lock!=null) {
- lock.removeWork(workId);
- }
-
if (transaction!=null && transaction.isActive()) {
try {
transaction.rollback();
@@ -195,7 +336,7 @@
}
}
- if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
+ if(log.isDebugEnabled()) log.debug("Exit:
InvokerWorker::invokeMessage");
}
private void makeMessageReadyForReinjection(MessageContext
messageContext) {
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties?rev=592342&r1=592341&r2=592342&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
Tue Nov 6 01:05:28 2007
@@ -222,7 +222,6 @@
toBeanNotSet=The 'To' Sequence Property Bean has not been set for the sequence.
cannotFindTransportInDesc=Cannot find the transport in description {0} in the
ConfigurationContext.
invalidElementFoundWithinElement=Found invalid ''{0}'' element within ''{1}''
element.
-invokerNotFound=An invoker thread was not found to dispatch messages on the
inbound sequence {0}.
cannotSetPolicyBeanServiceNull=Cannot set the given SandeshaPolicyBean since
the AxisService is not present
#------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]