Author: chamikara
Date: Wed Jul 18 04:37:48 2007
New Revision: 557231

URL: http://svn.apache.org/viewvc?view=rev&rev=557231
Log:
Some updates to the transaction handling logic.
A couple of bug fixes

Modified:
    
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
    
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
    
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
    
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
    
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
    
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
    
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java

Modified: 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
--- 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
 (original)
+++ 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/client/SandeshaClient.java
 Wed Jul 18 04:37:48 2007
@@ -1428,6 +1428,7 @@
                                        parameter = new Parameter ();
                                        
parameter.setName(Sandesha2Constants.SANDESHA_PROPERTY_BEAN);
                                } else {
+                                       parameter.setEditable(true); //if we 
don't do it here, Axis2 will not allow us to override the parameter value.
                                        parent = (SandeshaPolicyBean) 
parameter.getValue();
                                        policyBean.setParent(parent);
                                }

Modified: 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
--- 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 (original)
+++ 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/storage/inmemory/InMemoryStorageManager.java
 Wed Jul 18 04:37:48 2007
@@ -294,8 +294,13 @@
                                //Storing a cloned version will caus 
HeaderBlocks to loose their setProcessed information.
                                StorageEntry entry = new StorageEntry();
                                entry.msgContext = msgContext;
-                               entry.envelope = msgContext.getEnvelope();
+                               
+                               //building the full enveloper before storing.
+                               SOAPEnvelope envelope = 
msgContext.getEnvelope();
+                               envelope.buildWithAttachments();
+                               entry.envelope = envelope;
                                storageMap.put(key,entry);
+                               
                        }
                } catch(Exception e) {
                        String message = SandeshaMessageHelper.getMessage(
@@ -323,7 +328,17 @@
        public void removeMessageContext(String key) { 
                if(log.isDebugEnabled()) log.debug("Enter: 
InMemoryStorageManager::removeMessageContext, key: " + key);
 
-               storageMap.remove(key);
+        StorageEntry entry = (StorageEntry) storageMap.remove(key);
+        if( entry != null && entry.msgContext != null ) {
+            String messageId = entry.msgContext.getMessageID();
+            if( getContext().getOperationContext( messageId ) != null ) {
+                getContext().unregisterOperationContext( messageId );
+            } else {
+                log.debug("No OperationContext registered for MessageId : " + 
messageId );
+            }
+        } else {
+            if(log.isDebugEnabled())  log.debug("No MessageContext entry for 
key : " + key );
+        }
                
                if(log.isDebugEnabled()) log.debug("Exit: 
InMemoryStorageManager::removeMessageContext, key: " + key);
        }

Modified: 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
--- 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
 (original)
+++ 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/util/SandeshaUtil.java
 Wed Jul 18 04:37:48 2007
@@ -144,6 +144,7 @@
 
        public static void startWorkersForSequence(ConfigurationContext 
context, RMSequenceBean sequence)
        throws SandeshaException {
+       
                if (log.isDebugEnabled())
                        log.debug("Enter: 
SandeshaUtil::startWorkersForSequence, sequence " + sequence);
                

Modified: 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
--- 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
 (original)
+++ 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Invoker.java
 Wed Jul 18 04:37:48 2007
@@ -238,6 +238,10 @@
                                processedMessage = false;
                                
                                if (log.isDebugEnabled()) log.debug("Exit: 
Invoker::internalRun, looped over all sequences, sleep " + sleep);
+                               
+                               if(transaction != null && 
transaction.isActive()) transaction.commit();
+                               transaction = null;
+                               
                                return sleep;
                        }
 
@@ -255,6 +259,10 @@
                                        sleep = true;
 
                                if (log.isDebugEnabled()) log.debug("Exit: 
Invoker::internalRun, sleep " + sleep);
+                               
+                               if(transaction != null && 
transaction.isActive()) transaction.commit();
+                               transaction = null;
+
                                return sleep;
                        }
 
@@ -282,6 +290,10 @@
                        // If there aren't any beans to process then move on to 
the next sequence
                        if (invokerBeans.size() == 0) {
                                if (log.isDebugEnabled()) log.debug("Exit: 
Invoker::internalRun, no beans to invoke on sequence " + sequenceId + ", sleep 
" + sleep);
+                               
+                               if(transaction != null && 
transaction.isActive()) transaction.commit();
+                               transaction = null;
+
                                return sleep;
                        }
                        
@@ -306,6 +318,12 @@
                                        sleep = true;
                                        String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.workAlreadyAssigned, 
workId);
                                        if (log.isDebugEnabled()) 
log.debug("Exit: Invoker::internalRun, " + message + ", sleep " + sleep);
+                                       
+                                       if(transaction != null) {
+                                               transaction.commit();
+                                               transaction = null;
+                                       }
+                                       
                                        return sleep;
                                }
 

Modified: 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
--- 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
 (original)
+++ 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/InvokerWorker.java
 Wed Jul 18 04:37:48 2007
@@ -54,18 +54,10 @@
                        transaction = storageManager.getTransaction();
                        
                        InvokerBean invokerBean = 
invokerBeanMgr.retrieve(messageContextKey);
-                       
+
                        msgToInvoke = 
storageManager.retrieveMessageContext(messageContextKey, configurationContext);
                        RMMsgContext rmMsg = 
MsgInitializer.initializeMessage(msgToInvoke);
 
-                       // ending the transaction before invocation.
-                       if(transaction != null) {
-                               transaction.commit();
-                               transaction = null;
-                       }
-                                       
-                       //starting a transaction for the invocation work.
-                       transaction = storageManager.getTransaction();
                        // Lock the RMD Bean just to avoid deadlocks
                        SandeshaUtil.getRMDBeanFromSequenceId(storageManager, 
invokerBean.getSequenceID());
                        // Depending on the transaction  support, the service 
will be invoked only once. 
@@ -74,6 +66,12 @@
                        // removing the corresponding message context as well.
                        storageManager.removeMessageContext(messageContextKey);
 
+                       // ending the transaction before invocation.
+                       if(transaction != null) {
+                               transaction.commit();
+                               transaction = null;
+                       }
+                       
                        try {
 
                                boolean postFailureInvocation = false;
@@ -100,11 +98,7 @@
                                        msgToInvoke.setPaused(false);
                                        AxisEngine.resumeReceive(msgToInvoke);
                                }
-                               
-                               if(transaction!=null){
-                                       transaction.commit();
-                                       transaction = 
storageManager.getTransaction();
-                               }
+
                        } catch (Exception e) {
                                if (log.isDebugEnabled())
                                        log.debug("Exception :", e);
@@ -113,7 +107,8 @@
                        }
 
 
-                       
+                       transaction = storageManager.getTransaction();
+                        
                        if (rmMsg.getMessageType() == 
Sandesha2Constants.MessageTypes.APPLICATION) {
                                Sequence sequence = (Sequence) rmMsg
                                                
.getMessagePart(Sandesha2Constants.MessageParts.SEQUENCE);
@@ -137,7 +132,8 @@
                                        
TerminateManager.cleanReceivingSideAfterInvocation(invokerBean.getSequenceID(), 
storageManager);
                                        // exit from current iteration. (since 
an entry
                                        // was removed)
-                                       if(log.isDebugEnabled()) 
log.debug("Exit: InvokerWorker::run Last message return");                      
               
+                                       if(log.isDebugEnabled()) 
log.debug("Exit: InvokerWorker::run Last message return");     
+                                       if(transaction != null && 
transaction.isActive()) transaction.commit();
                                        return;
                                }
                        }

Modified: 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
--- 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
 (original)
+++ 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
 Wed Jul 18 04:37:48 2007
@@ -77,6 +77,7 @@
                        // Pick a sequence using a round-robin approach
                        ArrayList allSequencesList = getSequences();
                        int size = allSequencesList.size();
+
                        if (log.isDebugEnabled())
                                log.debug("Choosing one from " + size + " 
sequences");
                        if(nextIndex >= size) {
@@ -103,14 +104,14 @@
                                if (log.isDebugEnabled()) log.debug("Exit: 
Sender::internalRun, looped over all sequences, sleep " + sleep);
                                return sleep;
                        }
+                       
+                       transaction = storageManager.getTransaction();
 
                        SequenceEntry entry = (SequenceEntry) 
allSequencesList.get(nextIndex++);
                        String sequenceId = entry.getSequenceId();
                        if (log.isDebugEnabled())
                                log.debug("Chose sequence " + sequenceId);
 
-                       transaction = storageManager.getTransaction();
-
                        String rmVersion = null;
                        // Check that the sequence is still valid
                        boolean found = false;
@@ -141,6 +142,12 @@
                        if (!found) {
                                stopThreadForSequence(sequenceId, 
entry.isRmSource());
                                if (log.isDebugEnabled()) log.debug("Exit: 
Sender::internalRun, sequence has ended");
+                               
+                               if(transaction != null && 
transaction.isActive()) {
+                                       transaction.commit();
+                                       transaction = null;
+                               }
+                               
                                return false;
                        }
                        
@@ -149,6 +156,12 @@
                        
                        if (senderBean == null) {
                                if (log.isDebugEnabled()) log.debug("Exit: 
Sender::internalRun, no message for this sequence");
+                               
+                               if(transaction != null && 
transaction.isActive()) {
+                                       transaction.commit();
+                                       transaction = null;
+                               }
+                               
                                return false; // Move on to the next sequence 
in the list
                        }
 
@@ -170,13 +183,18 @@
                                                                        workId);
                                        log.debug("Exit: Sender::internalRun, " 
+ message + ", sleeping");
                                }
+                               
+                               if(transaction != null && 
transaction.isActive()) {
+                                       transaction.commit();
+                                       transaction = null;
+                               }
+                               
                                return true;
                        }
 
-                       if(transaction != null) {
-                               transaction.commit();
-                               transaction = null;
-                       }
+                       //commiting the transaction here to release resources 
early.
+                       if(transaction != null && transaction.isActive()) 
transaction.commit();
+                       transaction = null;
 
                        // start a worker which will work on this messages.
                        SenderWorker worker = new SenderWorker(context, 
senderBean, rmVersion);
@@ -193,9 +211,6 @@
                        // remember not to sleep at the end of the list of 
sequences.
                        processedMessage = true;
                        
-                       if(transaction != null && transaction.isActive()) 
transaction.commit();
-                       transaction = null;
-
                } catch (Exception e) {
 
                        // TODO : when this is the client side throw the 
exception to
@@ -312,7 +327,6 @@
                        }           
            
                        if(transaction != null && transaction.isActive()) 
transaction.commit();
-                       transaction = null;
                        
                } catch (SandeshaException e) {
                        if (log.isErrorEnabled())
@@ -352,6 +366,7 @@
                if (log.isDebugEnabled())
                        log.debug("Removing RMSBean " + rmsBean);
                
storageManager.getRMSBeanMgr().delete(rmsBean.getCreateSeqMsgID());
+               storageManager.removeMessageContext( 
rmsBean.getReferenceMessageStoreKey() );
        }               
     }
 

Modified: 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java?view=diff&rev=557231&r1=557230&r2=557231
==============================================================================
--- 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
 (original)
+++ 
webservices/sandesha/branches/sandesha2/java/1_3/modules/core/src/main/java/org/apache/sandesha2/workers/SenderWorker.java
 Wed Jul 18 04:37:48 2007
@@ -96,6 +96,10 @@
       
                                if (msgCtx == null) {
                                        // This sender bean has already been 
processed
+                                       
+                                       if(transaction != null && 
transaction.isActive()) transaction.commit();
+                                       transaction = null;
+
                                        return;
                                }
       
@@ -113,12 +117,24 @@
                        if (qualifiedForSending != null && 
!qualifiedForSending.equals(Sandesha2Constants.VALUE_TRUE)) {
                                if (log.isDebugEnabled())
                                        log.debug("Exit: SenderWorker::run, 
!qualified for sending");
+                               
+                               if(transaction != null && 
transaction.isActive()) {
+                                       transaction.commit();
+                                       transaction = null;
+                               }
+                               
                                return;
                        }
 
                        if (msgCtx == null) {
                                if (log.isDebugEnabled())
                                        
log.debug(SandeshaMessageHelper.getMessage(SandeshaMessageKeys.sendHasUnavailableMsgEntry));
+                               
+                               if(transaction != null && 
transaction.isActive()) {
+                                       transaction.commit();
+                                       transaction = null;
+                               }
+                               
                                return;                 
                        }
 
@@ -128,6 +144,12 @@
                        if (msgsNotToSend != null && msgsNotToSend.contains(new 
Integer(rmMsgCtx.getMessageType()))) {
                                if (log.isDebugEnabled())
                                        log.debug("Exit: SenderWorker::run, 
message type to be dropped " + rmMsgCtx.getMessageType());
+                               
+                               if(transaction != null && 
transaction.isActive()) {
+                                       transaction.commit();
+                                       transaction = null;
+                               }
+                               
                                return; 
                        }
 
@@ -177,6 +199,12 @@
                        if (!continueSending) { 
                                if (log.isDebugEnabled())
                                        log.debug("Exit: SenderWorker::run, 
!continueSending");
+                               
+                               if(transaction != null && 
transaction.isActive()) {
+                                       transaction.commit();
+                                       transaction = null;
+                               }
+                               
                                return;
                        }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to