Author: chamikara
Date: Wed Jul 18 04:37:48 2007
New Revision: 557231
URL: http://svn.apache.org/viewvc?view=rev&rev=557231
Log:
Some updates to the transaction handling logic.
A couple of bug fixes
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
---
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
(original)
+++
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
Wed Jul 18 04:37:48 2007
@@ -1428,6 +1428,7 @@
parameter = new Parameter ();
parameter.setName(Sandesha2Constants.SANDESHA_PROPERTY_BEAN);
} else {
+ parameter.setEditable(true); //if we
don't do it here, Axis2 will not allow us to override the parameter value.
parent = (SandeshaPolicyBean)
parameter.getValue();
policyBean.setParent(parent);
}
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
---
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
(original)
+++
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
Wed Jul 18 04:37:48 2007
@@ -294,8 +294,13 @@
//Storing a cloned version will caus
HeaderBlocks to loose their setProcessed information.
StorageEntry entry = new StorageEntry();
entry.msgContext = msgContext;
- entry.envelope = msgContext.getEnvelope();
+
+ //building the full enveloper before storing.
+ SOAPEnvelope envelope =
msgContext.getEnvelope();
+ envelope.buildWithAttachments();
+ entry.envelope = envelope;
storageMap.put(key,entry);
+
}
} catch(Exception e) {
String message = SandeshaMessageHelper.getMessage(
@@ -323,7 +328,17 @@
public void removeMessageContext(String key) {
if(log.isDebugEnabled()) log.debug("Enter:
InMemoryStorageManager::removeMessageContext, key: " + key);
- storageMap.remove(key);
+ StorageEntry entry = (StorageEntry) storageMap.remove(key);
+ if( entry != null && entry.msgContext != null ) {
+ String messageId = entry.msgContext.getMessageID();
+ if( getContext().getOperationContext( messageId ) != null ) {
+ getContext().unregisterOperationContext( messageId );
+ } else {
+ log.debug("No OperationContext registered for MessageId : " +
messageId );
+ }
+ } else {
+ if(log.isDebugEnabled()) log.debug("No MessageContext entry for
key : " + key );
+ }
if(log.isDebugEnabled()) log.debug("Exit:
InMemoryStorageManager::removeMessageContext, key: " + key);
}
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
---
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
(original)
+++
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
Wed Jul 18 04:37:48 2007
@@ -144,6 +144,7 @@
public static void startWorkersForSequence(ConfigurationContext
context, RMSequenceBean sequence)
throws SandeshaException {
+
if (log.isDebugEnabled())
log.debug("Enter:
SandeshaUtil::startWorkersForSequence, sequence " + sequence);
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
---
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
(original)
+++
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
Wed Jul 18 04:37:48 2007
@@ -238,6 +238,10 @@
processedMessage = false;
if (log.isDebugEnabled()) log.debug("Exit:
Invoker::internalRun, looped over all sequences, sleep " + sleep);
+
+ if(transaction != null &&
transaction.isActive()) transaction.commit();
+ transaction = null;
+
return sleep;
}
@@ -255,6 +259,10 @@
sleep = true;
if (log.isDebugEnabled()) log.debug("Exit:
Invoker::internalRun, sleep " + sleep);
+
+ if(transaction != null &&
transaction.isActive()) transaction.commit();
+ transaction = null;
+
return sleep;
}
@@ -282,6 +290,10 @@
// If there aren't any beans to process then move on to
the next sequence
if (invokerBeans.size() == 0) {
if (log.isDebugEnabled()) log.debug("Exit:
Invoker::internalRun, no beans to invoke on sequence " + sequenceId + ", sleep
" + sleep);
+
+ if(transaction != null &&
transaction.isActive()) transaction.commit();
+ transaction = null;
+
return sleep;
}
@@ -306,6 +318,12 @@
sleep = true;
String message =
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned,
workId);
if (log.isDebugEnabled())
log.debug("Exit: Invoker::internalRun, " + message + ", sleep " + sleep);
+
+ if(transaction != null) {
+ transaction.commit();
+ transaction = null;
+ }
+
return sleep;
}
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
---
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
(original)
+++
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
Wed Jul 18 04:37:48 2007
@@ -54,18 +54,10 @@
transaction = storageManager.getTransaction();
InvokerBean invokerBean =
invokerBeanMgr.retrieve(messageContextKey);
-
+
msgToInvoke =
storageManager.retrieveMessageContext(messageContextKey, configurationContext);
RMMsgContext rmMsg =
MsgInitializer.initializeMessage(msgToInvoke);
- // ending the transaction before invocation.
- if(transaction != null) {
- transaction.commit();
- transaction = null;
- }
-
- //starting a transaction for the invocation work.
- transaction = storageManager.getTransaction();
// Lock the RMD Bean just to avoid deadlocks
SandeshaUtil.getRMDBeanFromSequenceId(storageManager,
invokerBean.getSequenceID());
// Depending on the transaction support, the service
will be invoked only once.
@@ -74,6 +66,12 @@
// removing the corresponding message context as well.
storageManager.removeMessageContext(messageContextKey);
+ // ending the transaction before invocation.
+ if(transaction != null) {
+ transaction.commit();
+ transaction = null;
+ }
+
try {
boolean postFailureInvocation = false;
@@ -100,11 +98,7 @@
msgToInvoke.setPaused(false);
AxisEngine.resumeReceive(msgToInvoke);
}
-
- if(transaction!=null){
- transaction.commit();
- transaction =
storageManager.getTransaction();
- }
+
} catch (Exception e) {
if (log.isDebugEnabled())
log.debug("Exception :", e);
@@ -113,7 +107,8 @@
}
-
+ transaction = storageManager.getTransaction();
+
if (rmMsg.getMessageType() ==
Sandesha2Constants.MessageTypes.APPLICATION) {
Sequence sequence = (Sequence) rmMsg
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
@@ -137,7 +132,8 @@
TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(),
storageManager);
// exit from current iteration. (since
an entry
// was removed)
- if(log.isDebugEnabled())
log.debug("Exit: InvokerWorker::run Last message return");
+ if(log.isDebugEnabled())
log.debug("Exit: InvokerWorker::run Last message return");
+ if(transaction != null &&
transaction.isActive()) transaction.commit();
return;
}
}
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
---
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
(original)
+++
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
Wed Jul 18 04:37:48 2007
@@ -77,6 +77,7 @@
// Pick a sequence using a round-robin approach
ArrayList allSequencesList = getSequences();
int size = allSequencesList.size();
+
if (log.isDebugEnabled())
log.debug("Choosing one from " + size + "
sequences");
if(nextIndex >= size) {
@@ -103,14 +104,14 @@
if (log.isDebugEnabled()) log.debug("Exit:
Sender::internalRun, looped over all sequences, sleep " + sleep);
return sleep;
}
+
+ transaction = storageManager.getTransaction();
SequenceEntry entry = (SequenceEntry)
allSequencesList.get(nextIndex++);
String sequenceId = entry.getSequenceId();
if (log.isDebugEnabled())
log.debug("Chose sequence " + sequenceId);
- transaction = storageManager.getTransaction();
-
String rmVersion = null;
// Check that the sequence is still valid
boolean found = false;
@@ -141,6 +142,12 @@
if (!found) {
stopThreadForSequence(sequenceId,
entry.isRmSource());
if (log.isDebugEnabled()) log.debug("Exit:
Sender::internalRun, sequence has ended");
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return false;
}
@@ -149,6 +156,12 @@
if (senderBean == null) {
if (log.isDebugEnabled()) log.debug("Exit:
Sender::internalRun, no message for this sequence");
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return false; // Move on to the next sequence
in the list
}
@@ -170,13 +183,18 @@
workId);
log.debug("Exit: Sender::internalRun, "
+ message + ", sleeping");
}
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return true;
}
- if(transaction != null) {
- transaction.commit();
- transaction = null;
- }
+ //commiting the transaction here to release resources
early.
+ if(transaction != null && transaction.isActive())
transaction.commit();
+ transaction = null;
// start a worker which will work on this messages.
SenderWorker worker = new SenderWorker(context,
senderBean, rmVersion);
@@ -193,9 +211,6 @@
// remember not to sleep at the end of the list of
sequences.
processedMessage = true;
- if(transaction != null && transaction.isActive())
transaction.commit();
- transaction = null;
-
} catch (Exception e) {
// TODO : when this is the client side throw the
exception to
@@ -312,7 +327,6 @@
}
if(transaction != null && transaction.isActive())
transaction.commit();
- transaction = null;
} catch (SandeshaException e) {
if (log.isErrorEnabled())
@@ -352,6 +366,7 @@
if (log.isDebugEnabled())
log.debug("Removing RMSBean " + rmsBean);
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+ storageManager.removeMessageContext(
rmsBean.getReferenceMessageStoreKey() );
}
}
Modified:
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL:
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
---
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
(original)
+++
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
Wed Jul 18 04:37:48 2007
@@ -96,6 +96,10 @@
if (msgCtx == null) {
// This sender bean has already been
processed
+
+ if(transaction != null &&
transaction.isActive()) transaction.commit();
+ transaction = null;
+
return;
}
@@ -113,12 +117,24 @@
if (qualifiedForSending != null &&
!qualifiedForSending.equals(Sandesha2Constants.VALUE_TRUE)) {
if (log.isDebugEnabled())
log.debug("Exit: SenderWorker::run,
!qualified for sending");
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return;
}
if (msgCtx == null) {
if (log.isDebugEnabled())
log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendHasUnavailableMsgEntry));
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return;
}
@@ -128,6 +144,12 @@
if (msgsNotToSend != null && msgsNotToSend.contains(new
Integer(rmMsgCtx.getMessageType()))) {
if (log.isDebugEnabled())
log.debug("Exit: SenderWorker::run,
message type to be dropped " + rmMsgCtx.getMessageType());
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return;
}
@@ -177,6 +199,12 @@
if (!continueSending) {
if (log.isDebugEnabled())
log.debug("Exit: SenderWorker::run,
!continueSending");
+
+ if(transaction != null &&
transaction.isActive()) {
+ transaction.commit();
+ transaction = null;
+ }
+
return;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]