Author: mlovett
Date: Mon May 21 09:14:08 2007
New Revision: 540189

URL: http://svn.apache.org/viewvc?view=rev&rev=540189
Log:
Release the transport if it is blocked too long. This frees up transport 
resources when doing sync-2-way messaging with RM.

Modified:
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
    
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
    
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/Sandesha2Constants.java
 Mon May 21 09:14:08 2007
@@ -490,6 +490,8 @@
        int CLIENT_SLEEP_TIME = 10000;
 
        int TERMINATE_DELAY = 100;
+       
+       static final int TRANSPORT_WAIT_TIME = 60000;
 
        static final String TEMP_SEQUENCE_ID = "uuid:tempID";
 

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 Mon May 21 09:14:08 2007
@@ -230,4 +230,7 @@
        public final static String referenceMessageNotSetForSequence = 
"referenceMessageNotSetForSequence";
        public final static String moduleNotSet = "moduleNotSet";
        public final static String cannotSetPolicyBeanServiceNull = 
"cannotSetPolicyBeanServiceNull";
+       public final static String noPolling="noPolling";
+       public final static String freeingTransport="freeingTransport";
+
 }

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/util/AcknowledgementManager.java
 Mon May 21 09:14:08 2007
@@ -198,7 +198,7 @@
 
                AxisOperation ackOperation = 
SpecSpecificConstants.getWSRMOperation(
                                Sandesha2Constants.MessageTypes.ACK,
-                               referenceRMMessage.getRMSpecVersion(),
+                               rmdBean.getRMVersion(),
                                referenceMsg.getAxisService());
 
                MessageContext ackMsgCtx = 
SandeshaUtil.createNewRelatedMessageContext(referenceRMMessage, ackOperation);

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/java/org/apache/sandesha2/workers/Sender.java
 Mon May 21 09:14:08 2007
@@ -21,8 +21,15 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.context.OperationContext;
+import org.apache.axis2.transport.RequestResponseTransport;
+import 
org.apache.axis2.transport.RequestResponseTransport.RequestResponseTransportStatus;
+import org.apache.axis2.wsdl.WSDLConstants;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sandesha2.RMMsgContext;
 import org.apache.sandesha2.Sandesha2Constants;
 import org.apache.sandesha2.SandeshaException;
 import org.apache.sandesha2.i18n.SandeshaMessageHelper;
@@ -35,6 +42,8 @@
 import org.apache.sandesha2.storage.beans.RMDBean;
 import org.apache.sandesha2.storage.beans.RMSBean;
 import org.apache.sandesha2.storage.beans.SenderBean;
+import org.apache.sandesha2.util.AcknowledgementManager;
+import org.apache.sandesha2.util.MsgInitializer;
 import org.apache.sandesha2.util.SandeshaUtil;
 import org.apache.sandesha2.util.SequenceManager;
 
@@ -81,7 +90,15 @@
                                
                                // At this point - delete any sequences that 
have timed out, or been terminated.
                                deleteTerminatedSequences(storageManager);
+
+                               // Also clean up and sender beans that are not 
yet eligible for sending, but
+                               // are blocking the transport threads.
+                               unblockTransportThreads(storageManager);
                                
+                               // Finally, check for messages that can only be 
serviced by polling, and warn
+                               // the user if they are too old
+                               checkForOrphanMessages(storageManager);
+
                                if (log.isDebugEnabled()) log.debug("Exit: 
Sender::internalRun, looped over all sequences, sleep " + sleep);
                                return sleep;
                        }
@@ -339,5 +356,146 @@
 
                if (log.isDebugEnabled()) 
                        log.debug("Exit: Sender::deleteRMSBeans");
+       }
+
+       private void unblockTransportThreads(StorageManager manager)
+       throws SandeshaStorageException
+       {
+               if (log.isDebugEnabled()) log.debug("Enter: 
Sender::unblockTransportThreads");
+
+               Transaction transaction = null;
+               try {
+                       transaction = manager.getTransaction();
+                       
+                       // This finder will look for beans that have been 
locking the transport for longer than
+                       // the TRANSPORT_WAIT_TIME. The match method for 
SenderBeans does the time comparison
+                       // for us.
+                       SenderBean finder = new SenderBean();
+                       finder.setSend(false);
+                       finder.setTransportAvailable(true);
+                       finder.setTimeToSend(System.currentTimeMillis() - 
Sandesha2Constants.TRANSPORT_WAIT_TIME);
+                       
+                       List beans = manager.getSenderBeanMgr().find(finder);
+                       Iterator beanIter = beans.iterator();
+                       while(beanIter.hasNext()) {
+                               // The beans we have found are assigned to an 
internal sequence id, but the create
+                               // sequence has not completed yet (and perhaps 
never will). Server-side, most of the
+                               // info that we can usefully print is 
associated with the inbound sequence that generated
+                               // this message.
+                               SenderBean bean = (SenderBean) beanIter.next();
+                               
+                               // Load the message, so that we can free the 
transport (if there is one there). The
+                               // case we are trying to free up is when there 
is a request-response transport, and
+                               // it's still there waiting.
+                               MessageContext msgCtx = 
manager.retrieveMessageContext(bean.getMessageContextRefKey(), context);
+
+                               RequestResponseTransport t = null;
+                               MessageContext inMsg = null;
+                               OperationContext op = 
msgCtx.getOperationContext();
+                               if (op != null)
+                                       inMsg = 
op.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
+                               if (inMsg != null)
+                                       t = (RequestResponseTransport) 
inMsg.getProperty(RequestResponseTransport.TRANSPORT_CONTROL);
+       
+                               if((t != null || 
!t.getStatus().equals(RequestResponseTransportStatus.WAITING))) {
+                                       if(log.isWarnEnabled()) {
+                                               String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.freeingTransport);
+                                               log.warn(message);
+                                       }
+                                       // If the message is a reply, then the 
request may need to be acked. Rather
+                                       // than just return a HTTP 202, we 
should try to send an ack.
+                                       boolean sendAck = false;
+                                       RMDBean inbound = null;
+                                       String inboundSeq = 
bean.getInboundSequenceId();
+                                       if(inboundSeq != null) 
+                                               inbound = 
SandeshaUtil.getRMDBeanFromSequenceId(manager, inboundSeq);
+                                       
+                                       if(inbound != null) {
+                                               String acksTo = 
inbound.getAcksToEPR();
+                                               EndpointReference acksToEPR = 
new EndpointReference(acksTo);
+                                               if(acksTo == null || 
acksToEPR.hasAnonymousAddress())
+                                                       sendAck = true;
+                                       }
+                                       
+                                       if(sendAck) {
+                                               RMMsgContext rmMsgCtx = 
MsgInitializer.initializeMessage(msgCtx);
+                                               RMMsgContext ackRMMsgCtx = 
AcknowledgementManager.generateAckMessage(
+                                                               rmMsgCtx, 
inbound, inbound.getSequenceID(), storageManager, true);
+                                               
AcknowledgementManager.sendAckNow(ackRMMsgCtx);
+                                               
msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
 "true");
+                                               t.signalResponseReady();
+                                       } else {
+                                               
msgCtx.getOperationContext().setProperty(org.apache.axis2.Constants.RESPONSE_WRITTEN,
 "false");
+                                               t.acknowledgeMessage(msgCtx);
+                                       }
+       
+                                       // Mark the bean so that we know the 
transport is missing, and reset the send time
+                                       bean.setTransportAvailable(false);
+                                       
bean.setTimeToSend(System.currentTimeMillis());
+                                       
+                                       // Update the bean
+                                       manager.getSenderBeanMgr().update(bean);
+                               }
+                       }
+       
+                       if(transaction != null && transaction.isActive()) 
transaction.commit();
+                       transaction = null;
+                       
+               } catch(Exception e) {
+                       // There isn't much we can do here, so log the 
exception and continue.
+                       if(log.isDebugEnabled()) log.debug("Exception", e);
+               } finally {
+                       if(transaction != null && transaction.isActive()) 
transaction.rollback();
+               }
+               
+               if (log.isDebugEnabled()) log.debug("Exit: 
Sender::unblockTransportThreads");
+       }
+               
+       private void checkForOrphanMessages(StorageManager manager)
+       throws SandeshaStorageException
+       {
+               if(log.isDebugEnabled()) log.debug("Enter: 
Sender::checkForOrphanMessages");
+               
+               Transaction tran = null;
+               try {
+                       tran = manager.getTransaction();
+       
+                       // This finder will look for beans that should have 
been sent, but could not be sent
+                       // because they need a MakeConnection message to come 
in to pick it up. We also factor
+                       // in TRANSPORT_WAIT_TIME to give the MakeConnection a 
chance to arrive.
+                       SenderBean finder = new SenderBean();
+                       finder.setSend(true);
+                       finder.setTransportAvailable(false);
+                       finder.setTimeToSend(System.currentTimeMillis() - 
Sandesha2Constants.TRANSPORT_WAIT_TIME);
+                       
+                       List beans = manager.getSenderBeanMgr().find(finder);
+                       Iterator beanIter = beans.iterator();
+                       while(beanIter.hasNext()) {
+                               SenderBean bean = (SenderBean) beanIter.next();
+                               
+                               // Emit a message to warn the user that 
MakeConnections are not arriving to pick
+                               // messages up
+                               if(log.isWarnEnabled()) {
+                                       String messageType = 
Integer.toString(bean.getMessageType());
+                                       String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.noPolling, messageType);
+                                       log.warn(message);
+                               }
+                               
+                               // Update the bean so that we won't emit 
another message for another TRANSPORT_WAIT_TIME
+                               bean.setTimeToSend(System.currentTimeMillis());
+                               manager.getSenderBeanMgr().update(bean);
+                       }
+       
+                       if(tran != null && tran.isActive()) tran.commit();
+                       tran = null;
+       
+               } catch(Exception e) {
+                       // There isn't much we can do here, so log the 
exception and continue.
+                       if(log.isDebugEnabled()) log.debug("Exception", e);
+               } finally {
+                       if(tran != null && tran.isActive()) tran.rollback();
+               }
+               
+               if(log.isDebugEnabled()) log.debug("Exit: 
Sender::checkForOrphanMessages");
        }
 }

Modified: 
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties?view=diff&rev=540189&r1=540188&r2=540189
==============================================================================
--- 
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
 (original)
+++ 
webservices/sandesha/trunk/java/modules/core/src/main/resources/org/apache/sandesha2/i18n/resource.properties
 Mon May 21 09:14:08 2007
@@ -78,6 +78,8 @@
 rmEnforceFailure=The message with MessageID ''{0}'' is not WSRM enabled but 
the service enforces WSRM.
 referenceMessageNotSetForSequence=ReferenceMessage has not been set for the 
sequence ''{0}''
 moduleNotSet=Sandesha Module has not been set at the initiation
+noPolling=A message has been waiting for a MakeConnection call. The message 
will continue to wait, but there may be a problem with the client 
configuration. Sandesha message type {0}.
+freeingTransport=Freeing transport resources. A message has held the transport 
for too long, check the log for other failures.
 
 #-------------------------------------
 #



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to