Author: mckierna
Date: Tue Nov 13 04:04:18 2007
New Revision: 594502
URL: http://svn.apache.org/viewvc?rev=594502&view=rev
Log:
some refactoring and potential NPE protection
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java?rev=594502&r1=594501&r2=594502&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/msgprocessors/SequenceProcessor.java
Tue Nov 13 04:04:18 2007
@@ -382,7 +382,7 @@
// then we should hand the message over to the invoker thread.
If not, we can invoke
// it directly ourselves.
InvokerWorker worker = null;
- if (SandeshaUtil.isInOrder(msgCtx) ||
storageManager.hasUserTransaction(msgCtx)) {
+ if (SandeshaUtil.isInOrder(msgCtx)) {
InvokerBean invokerBean = new InvokerBean(key, msgNo,
sequenceId);
ContextManager contextMgr =
SandeshaUtil.getContextManager(configCtx);
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?rev=594502&r1=594501&r2=594502&view=diff
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
Tue Nov 13 04:04:18 2007
@@ -59,6 +59,7 @@
private String messageContextKey;
private boolean ignoreNextMsg;
private boolean pooledThread;
+ boolean lastMessageInvoked;
public InvokerWorker (ConfigurationContext configurationContext,
InvokerBean bean) {
// All invoker workers need to use the same lock, so we point
to the static one here.
@@ -99,11 +100,6 @@
public void run() {
if(log.isDebugEnabled()) log.debug("Enter: InvokerWorker::run,
message " + messageNumber + ", sequence " + sequence);
- // If we are not the holder of the correct lock, then we have
to stop
- if(lock != null && !lock.ownsLock(workId, this)) {
- if (log.isDebugEnabled()) log.debug("Exit:
InvokerWorker::run, another worker holds the lock");
- return;
- }
Transaction tran = null;
try {
@@ -111,10 +107,11 @@
Runnable nextRunnable = null;
// Invoke the first message
- invokeMessage(null);
+ lastMessageInvoked = invokeMessage(null);
// Look for the next message, so long as we are still
processing normally
- while(!ignoreNextMsg) {
+ while(!ignoreNextMsg && lastMessageInvoked) {
+ if(log.isDebugEnabled())
log.debug("InvokerWorker:: looking for next msg to invoke");
InvokerBean finder = new InvokerBean();
finder.setSequenceID(sequence);
finder.setMsgNo(messageNumber + 1);
@@ -127,11 +124,12 @@
if(nextBean != null) {
if(pooledThread) {
+ if(log.isDebugEnabled())
log.debug("InvokerWorker:: pooledThread");
initializeFromBean(nextBean);
final Transaction theTran =
tran;
Runnable work = new Runnable() {
public void run() {
-
invokeMessage(theTran);
+
lastMessageInvoked = invokeMessage(theTran);
}
};
@@ -146,6 +144,7 @@
tran = null;
} else {
+ if(log.isDebugEnabled())
log.debug("InvokerWorker:: not pooled thread");
nextWorker = new
InvokerWorker(configurationContext, nextBean);
nextWorker.setPooled();
nextWorker.setWorkId(workId);
@@ -169,7 +168,7 @@
// break out of the loop
break;
}
- }
+ }//end while
if (workId !=null && lock!=null) {
lock.removeWork(workId);
@@ -202,11 +201,18 @@
if(log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run");
}
- private void invokeMessage(Transaction tran) {
+ private boolean invokeMessage(Transaction tran) {
if(log.isDebugEnabled()) log.debug("Enter:
InvokerWorker::invokeMessage");
Transaction transaction = null;
MessageContext msgToInvoke = null;
+ boolean messageInvoked = true;
+
+ // If we are not the holder of the correct lock, then we have to
stop
+ if(lock != null && (!lock.ownsLock(workId, this))) {
+ if (log.isDebugEnabled()) log.debug("Exit: InvokerWorker::run,
another worker holds the lock");
+ return false;
+ }
try {
@@ -223,6 +229,11 @@
InvokerBean invokerBean =
invokerBeanMgr.retrieve(messageContextKey);
msgToInvoke =
storageManager.retrieveMessageContext(messageContextKey, configurationContext);
+ if(msgToInvoke==null){
+ //return since there is nothing to do
+ if(log.isDebugEnabled()) log.debug("null msg");
+ return false;
+ }
// ending the transaction before invocation.
if(transaction != null) {
@@ -288,6 +299,7 @@
if (transaction != null &&
transaction.isActive())
transaction.rollback();
+ messageInvoked = false;
handleFault(rmMsg, e);
}
@@ -317,9 +329,9 @@
TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(),
storageManager);
// exit from current iteration. (since
an entry
// was removed)
- if(log.isDebugEnabled())
log.debug("Exit: InvokerWorker::invokeMessage Last message return");
if(transaction != null &&
transaction.isActive()) transaction.commit();
- return;
+ if(log.isDebugEnabled())
log.debug("Exit: InvokerWorker::invokeMessage Last message return " +
messageInvoked);
+ return messageInvoked;
}
}
@@ -344,6 +356,7 @@
} catch (Exception e) {
if (log.isErrorEnabled())
log.error(e.toString(), e);
+ messageInvoked = false;
} finally {
if (transaction!=null && transaction.isActive()) {
try {
@@ -355,7 +368,8 @@
}
}
- if(log.isDebugEnabled()) log.debug("Exit:
InvokerWorker::invokeMessage");
+ if(log.isDebugEnabled()) log.debug("Exit:
InvokerWorker::invokeMessage " + messageInvoked);
+ return messageInvoked;
}
private void makeMessageReadyForReinjection(MessageContext
messageContext) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]