Author: dims
Date: Mon Jul 23 07:51:46 2007
New Revision: 558755
URL: http://svn.apache.org/viewvc?view=rev&rev=558755
Log:
merge part of svn revision 557231 from 1.3 branch to get persistence + securerm
scenario working
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=558755&r1=558754&r2=558755
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
Mon Jul 23 07:51:46 2007
@@ -238,6 +238,10 @@
processedMessage = false;
if (log.isDebugEnabled()) log.debug("Exit:
Invoker::internalRun, looped over all sequences, sleep " + sleep);
+
+ if(transaction != null &&
transaction.isActive()) transaction.commit();
+ transaction = null;
+
return sleep;
}
@@ -255,6 +259,10 @@
sleep = true;
if (log.isDebugEnabled()) log.debug("Exit:
Invoker::internalRun, sleep " + sleep);
+
+ if(transaction != null &&
transaction.isActive()) transaction.commit();
+ transaction = null;
+
return sleep;
}
@@ -282,6 +290,10 @@
// If there aren't any beans to process then move on to
the next sequence
if (invokerBeans.size() == 0) {
if (log.isDebugEnabled()) log.debug("Exit:
Invoker::internalRun, no beans to invoke on sequence " + sequenceId + ", sleep
" + sleep);
+
+ if(transaction != null &&
transaction.isActive()) transaction.commit();
+ transaction = null;
+
return sleep;
}
@@ -306,6 +318,12 @@
sleep = true;
String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned,
workId);
if (log.isDebugEnabled())
log.debug("Exit: Invoker::internalRun, " + message + ", sleep " + sleep);
+
+ if(transaction != null) {
+ transaction.commit();
+ transaction = null;
+ }
+
return sleep;
}
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?view=diff&rev=558755&r1=558754&r2=558755
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
Mon Jul 23 07:51:46 2007
@@ -54,18 +54,10 @@
transaction = storageManager.getTransaction();
InvokerBean invokerBean =
invokerBeanMgr.retrieve(messageContextKey);
-
+
msgToInvoke =
storageManager.retrieveMessageContext(messageContextKey, configurationContext);
RMMsgContext rmMsg =
MsgInitializer.initializeMessage(msgToInvoke);
- // ending the transaction before invocation.
- if(transaction != null) {
- transaction.commit();
- transaction = null;
- }
-
- //starting a transaction for the invocation work.
- transaction = storageManager.getTransaction();
// Lock the RMD Bean just to avoid deadlocks
SandeshaUtil.getRMDBeanFromSequenceId(storageManager,
invokerBean.getSequenceID());
// Depending on the transaction support, the service
will be invoked only once.
@@ -74,6 +66,12 @@
// removing the corresponding message context as well.
storageManager.removeMessageContext(messageContextKey);
+ // ending the transaction before invocation.
+ if(transaction != null) {
+ transaction.commit();
+ transaction = null;
+ }
+
try {
boolean postFailureInvocation = false;
@@ -100,11 +98,7 @@
msgToInvoke.setPaused(false);
AxisEngine.resumeReceive(msgToInvoke);
}
-
- if(transaction!=null){
- transaction.commit();
- transaction =
storageManager.getTransaction();
- }
+
} catch (Exception e) {
if (log.isDebugEnabled())
log.debug("Exception :", e);
@@ -113,7 +107,8 @@
}
-
+ transaction = storageManager.getTransaction();
+
if (rmMsg.getMessageType() ==
Sandesha2Constants.MessageTypes.APPLICATION) {
Sequence sequence = (Sequence) rmMsg
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
@@ -137,7 +132,8 @@
TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(),
storageManager);
// exit from current iteration. (since
an entry
// was removed)
- if(log.isDebugEnabled())
log.debug("Exit: InvokerWorker::run Last message return");
+ if(log.isDebugEnabled())
log.debug("Exit: InvokerWorker::run Last message return");
+ if(transaction != null &&
transaction.isActive()) transaction.commit();
return;
}
}
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?view=diff&rev=558755&r1=558754&r2=558755
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
Mon Jul 23 07:51:46 2007
@@ -77,6 +77,7 @@
// Pick a sequence using a round-robin approach
ArrayList allSequencesList = getSequences();
int size = allSequencesList.size();
+
if (log.isDebugEnabled())
log.debug("Choosing one from " + size + "
sequences");
if(nextIndex >= size) {
@@ -103,14 +104,14 @@
if (log.isDebugEnabled()) log.debug("Exit:
Sender::internalRun, looped over all sequences, sleep " + sleep);
return sleep;
}
+
+ transaction = storageManager.getTransaction();
SequenceEntry entry = (SequenceEntry)
allSequencesList.get(nextIndex++);
String sequenceId = entry.getSequenceId();
if (log.isDebugEnabled())
log.debug("Chose sequence " + sequenceId);
- transaction = storageManager.getTransaction();
-
String rmVersion = null;
// Check that the sequence is still valid
boolean found = false;
@@ -141,6 +142,12 @@
if (!found) {
stopThreadForSequence(sequenceId,
entry.isRmSource());
if (log.isDebugEnabled()) log.debug("Exit:
Sender::internalRun, sequence has ended");
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return false;
}
@@ -149,6 +156,12 @@
if (senderBean == null) {
if (log.isDebugEnabled()) log.debug("Exit:
Sender::internalRun, no message for this sequence");
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return false; // Move on to the next sequence
in the list
}
@@ -170,13 +183,18 @@
workId);
log.debug("Exit: Sender::internalRun, "
+ message + ", sleeping");
}
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return true;
}
- if(transaction != null) {
- transaction.commit();
- transaction = null;
- }
+ //commiting the transaction here to release resources
early.
+ if(transaction != null && transaction.isActive())
transaction.commit();
+ transaction = null;
// start a worker which will work on this messages.
SenderWorker worker = new SenderWorker(context,
senderBean, rmVersion);
@@ -193,9 +211,6 @@
// remember not to sleep at the end of the list of
sequences.
processedMessage = true;
- if(transaction != null && transaction.isActive())
transaction.commit();
- transaction = null;
-
} catch (Exception e) {
// TODO : when this is the client side throw the
exception to
@@ -312,7 +327,6 @@
}
if(transaction != null && transaction.isActive())
transaction.commit();
- transaction = null;
} catch (SandeshaException e) {
if (log.isErrorEnabled())
@@ -352,6 +366,7 @@
if (log.isDebugEnabled())
log.debug("Removing RMSBean " + rmsBean);
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+ storageManager.removeMessageContext(
rmsBean.getReferenceMessageStoreKey() );
}
}
Modified:
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=558755&r1=558754&r2=558755
==============================================================================
---
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
(original)
+++
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Mon Jul 23 07:51:46 2007
@@ -96,6 +96,10 @@
if (msgCtx == null) {
// This sender bean has already been
processed
+
+ if(transaction != null &&
transaction.isActive()) transaction.commit();
+ transaction = null;
+
return;
}
@@ -113,12 +117,24 @@
if (qualifiedForSending != null &&
!qualifiedForSending.equals(Sandesha2Constants.VALUE_TRUE)) {
if (log.isDebugEnabled())
log.debug("Exit: SenderWorker::run,
!qualified for sending");
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return;
}
if (msgCtx == null) {
if (log.isDebugEnabled())
log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendHasUnavailableMsgEntry));
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return;
}
@@ -128,6 +144,12 @@
if (msgsNotToSend != null && msgsNotToSend.contains(new
Integer(rmMsgCtx.getMessageType()))) {
if (log.isDebugEnabled())
log.debug("Exit: SenderWorker::run,
message type to be dropped " + rmMsgCtx.getMessageType());
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return;
}
@@ -177,6 +199,12 @@
if (!continueSending) {
if (log.isDebugEnabled())
log.debug("Exit: SenderWorker::run,
!continueSending");
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]