Author: chamikara
Date: Tue Aug 22 23:28:48 2006
New Revision: 433934

URL: http://svn.apache.org/viewvc?rev=433934&view=rev
Log:
Changed SequenceManager updateClientListnerIfNeeded method to pick the 
incomingTransport from the 
scheme of the To url. This will only be picked if the transportInProtocol is 
not set.
Refactored the Invoker.


Modified:
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
    
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
    webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java?rev=433934&r1=433933&r2=433934&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/SandeshaMessageKeys.java
 Tue Aug 22 23:28:48 2006
@@ -221,5 +221,7 @@
        public static final String secureDummyNoSTR  ="secureDummyNoSTR";
        
        public static final String cannotFindTransportInDesc = 
"cannotFindTransportInDesc";
+       public static final String toEPRNotSet = "toEPRNotSet";
+       
        
 }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties?rev=433934&r1=433933&r2=433934&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/i18n/resource.properties
 Tue Aug 22 23:28:48 2006
@@ -80,7 +80,6 @@
 cannotInnitMessage=Sandesha2 Internal error: cannot initialize the message.
 propertyInvalidValue=Sandesha2 Internal error: property {0} contains an 
invalid value.
 couldNotCopyParameters=Could not copy parameters when creating the new RM 
Message.
-cannotFindTransportInDesc=Cannot find the transport in description {0} in the 
ConfigurationContext.
 
 #-------------------------------------
 #
@@ -244,6 +243,8 @@
 terminateOpperationIsNull=Terminate Operation was null
 invalidMsgNumberList=Invalid msg number list
 cannotFindReqMsgFromOpContext=Cannot find the request message from the 
operation context
+toEPRNotSet=To EPR has not been set in the given message
+cannotFindTransportInDesc=Cannot find the transport in description {0} in the 
ConfigurationContext
 
 #------------------
 # Security messages

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java?rev=433934&r1=433933&r2=433934&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/storage/inmemory/InMemorySenderBeanMgr.java
 Tue Aug 22 23:28:48 2006
@@ -130,8 +130,13 @@
        }
 
        public synchronized SenderBean getNextMsgToSend() {
+               
                Iterator iterator = table.keySet().iterator();
 
+               //TODO
+               //pick a random sequence out of the sequences to be sent.
+               
+               
                long lowestAppMsgNo = 0;
                while (iterator.hasNext()) {
                        Object key = iterator.next();

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java?rev=433934&r1=433933&r2=433934&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
 (original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/util/SequenceManager.java
 Tue Aug 22 23:28:48 2006
@@ -6,6 +6,8 @@
  */
 package org.apache.sandesha2.util;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Collection;
 
 import javax.xml.namespace.QName;
@@ -349,9 +351,27 @@
                try {
                        if ((startListnerForAsyncAcks || 
startListnerForAsyncControlMsgs) ) {
                                
-                               if (transportInProtocol == null)
-                                       throw new 
SandeshaException(SandeshaMessageHelper
-                                               
.getMessage(SandeshaMessageKeys.cannotStartListenerForIncommingMsgs));
+                               if (transportInProtocol == null){
+                                       EndpointReference toEPR = 
messageContext.getOptions().getTo();
+                                       if (toEPR==null) {
+                                               String message = 
SandeshaMessageHelper.getMessage(
+                                                               
SandeshaMessageKeys.toEPRNotSet);
+                                               throw new AxisFault (message);
+                                       }
+                                       
+                                       try {
+                                               URI uri = new URI 
(toEPR.getAddress());
+                                               String scheme = uri.getScheme();
+                                               
+                                               //this is a convention is 
Axis2. The name of the TransportInDescription has to be the
+                                               //scheme of a URI of that 
transport.
+                                               //Here we also assume that the 
Incoming transport will be same as the outgoing one.
+                                               transportInProtocol = scheme;
+                                       } catch (URISyntaxException e) {
+                                               throw new SandeshaException (e);
+                                       }
+                                       
+                               }
                        
                                //TODO following code was taken from 
ServiceContext.gegMyEPR method.
                                //         When a listner-starting method 
becomes available from Axis2, use that.

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java?rev=433934&r1=433933&r2=433934&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Invoker.java 
Tue Aug 22 23:28:48 2006
@@ -55,21 +55,28 @@
 public class Invoker extends Thread {
 
        private boolean runInvoker = false;
+
        private ArrayList workingSequences = new ArrayList();
+
        private ConfigurationContext context = null;
+
        private static final Log log = LogFactory.getLog(Invoker.class);
+
        private boolean hasStopped = false;
-       
-    private transient ThreadFactory threadPool;
-    public int INVOKER_THREADPOOL_SIZE =5;
-    
-    public Invoker () {
-       threadPool = new ThreadPool 
(INVOKER_THREADPOOL_SIZE,INVOKER_THREADPOOL_SIZE);
-    }
+
+       private transient ThreadFactory threadPool;
+
+       public int INVOKER_THREADPOOL_SIZE = 5;
+
+       public Invoker() {
+               threadPool = new ThreadPool(INVOKER_THREADPOOL_SIZE,
+                               INVOKER_THREADPOOL_SIZE);
+       }
 
        public synchronized void stopInvokerForTheSequence(String sequenceID) {
                if (log.isDebugEnabled())
-                       log.debug("Enter: 
InOrderInvoker::stopInvokerForTheSequence, " + sequenceID);
+                       log.debug("Enter: 
InOrderInvoker::stopInvokerForTheSequence, "
+                                       + sequenceID);
 
                workingSequences.remove(sequenceID);
                if (workingSequences.size() == 0) {
@@ -109,7 +116,8 @@
                return runInvoker;
        }
 
-       public synchronized void runInvokerForTheSequence(ConfigurationContext 
context, String sequenceID) {
+       public synchronized void runInvokerForTheSequence(
+                       ConfigurationContext context, String sequenceID) {
                if (log.isDebugEnabled())
                        log.debug("Enter: 
InOrderInvoker::runInvokerForTheSequence");
 
@@ -128,7 +136,9 @@
        private synchronized boolean hasStoppedInvoking() {
                if (log.isDebugEnabled()) {
                        log.debug("Enter: InOrderInvoker::hasStoppedInvoking");
-                       log.debug("Exit: InOrderInvoker::hasStoppedInvoking, " 
+ hasStopped);
+                       log
+                                       .debug("Exit: 
InOrderInvoker::hasStoppedInvoking, "
+                                                       + hasStopped);
                }
                return hasStopped;
        }
@@ -169,70 +179,75 @@
                        boolean rolebacked = false;
 
                        try {
-                               StorageManager storageManager = 
SandeshaUtil.getSandeshaStorageManager(context, context
-                                               .getAxisConfiguration());
+                               StorageManager storageManager = SandeshaUtil
+                                               
.getSandeshaStorageManager(context, context
+                                                               
.getAxisConfiguration());
                                NextMsgBeanMgr nextMsgMgr = 
storageManager.getNextMsgBeanMgr();
 
-                               InvokerBeanMgr storageMapMgr = 
storageManager.getStorageMapBeanMgr();
+                               InvokerBeanMgr storageMapMgr = storageManager
+                                               .getStorageMapBeanMgr();
 
-                               SequencePropertyBeanMgr sequencePropMgr = 
storageManager.getSequencePropertyBeanMgr();
+                               SequencePropertyBeanMgr sequencePropMgr = 
storageManager
+                                               .getSequencePropertyBeanMgr();
 
                                transaction = storageManager.getTransaction();
 
                                // Getting the incomingSequenceIdList
-                               SequencePropertyBean allSequencesBean = 
sequencePropMgr.retrieve(
-                                               
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
-                                               
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
+                               SequencePropertyBean allSequencesBean = 
sequencePropMgr
+                                               .retrieve(
+                                                               
Sandesha2Constants.SequenceProperties.ALL_SEQUENCES,
+                                                               
Sandesha2Constants.SequenceProperties.INCOMING_SEQUENCE_LIST);
 
                                if (allSequencesBean == null) {
                                        if (log.isDebugEnabled())
                                                log.debug("AllSequencesBean not 
found");
                                        continue;
                                }
-                               ArrayList allSequencesList = 
SandeshaUtil.getArrayListFromString(allSequencesBean.getValue());
-
+                               
+                               //TODO pick the sequence randomly.
+                               ArrayList allSequencesList = SandeshaUtil
+                                               
.getArrayListFromString(allSequencesBean.getValue());
                                Iterator allSequencesItr = 
allSequencesList.iterator();
+                               String sequenceId = (String) 
allSequencesItr.next();
 
-//                             currentIteration: while 
(allSequencesItr.hasNext()) {
-                                       String sequenceId = (String) 
allSequencesItr.next();
-
-                                       NextMsgBean nextMsgBean = 
nextMsgMgr.retrieve(sequenceId);
-                                       if (nextMsgBean == null) {
-                                               String message = "Next message 
not set correctly. Removing invalid entry.";
-                                               log.debug(message);
-                                               allSequencesItr.remove();
-
-                                               // cleaning the invalid data of 
the all sequences.
-                                               
allSequencesBean.setValue(allSequencesList.toString());
-                                               
sequencePropMgr.update(allSequencesBean);
-                                               continue;
-                                       }
+                               NextMsgBean nextMsgBean = 
nextMsgMgr.retrieve(sequenceId);
+                               if (nextMsgBean == null) {
+                                       String message = "Next message not set 
correctly. Removing invalid entry.";
+                                       log.debug(message);
+                                       allSequencesItr.remove();
+
+                                       // cleaning the invalid data of the all 
sequences.
+                                       
allSequencesBean.setValue(allSequencesList.toString());
+                                       
sequencePropMgr.update(allSequencesBean);
+                                       continue;
+                               }
 
-                                       long nextMsgno = 
nextMsgBean.getNextMsgNoToProcess();
-                                       if (nextMsgno <= 0) {
-                                               if (log.isDebugEnabled())
-                                                       log.debug("Invalid Next 
Message Number " + nextMsgno);
-                                               String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invalidMsgNumber, Long
-                                                               
.toString(nextMsgno));
-                                               throw new 
SandeshaException(message);
-                                       }
+                               long nextMsgno = 
nextMsgBean.getNextMsgNoToProcess();
+                               if (nextMsgno <= 0) {
+                                       if (log.isDebugEnabled())
+                                               log.debug("Invalid Next Message 
Number " + nextMsgno);
+                                       String message = 
SandeshaMessageHelper.getMessage(
+                                                       
SandeshaMessageKeys.invalidMsgNumber, Long
+                                                                       
.toString(nextMsgno));
+                                       throw new SandeshaException(message);
+                               }
 
-                                       Iterator stMapIt = 
storageMapMgr.find(new InvokerBean(null, nextMsgno, sequenceId)).iterator();
+                               Iterator stMapIt = storageMapMgr.find(
+                                               new InvokerBean(null, 
nextMsgno, sequenceId))
+                                               .iterator();
 
-                                       if (stMapIt.hasNext()) {
+                               if (stMapIt.hasNext()) {
 
-                                               InvokerBean invokerBean = 
(InvokerBean) stMapIt.next();
+                                       InvokerBean invokerBean = (InvokerBean) 
stMapIt.next();
 
-                                               transaction.commit();
-                                               
-                                               //start a new worker thread and 
let it do the invocation.
-                                               InvokerWorker worker = new 
InvokerWorker (context,invokerBean);
-                                               threadPool.execute(worker);
-                                               
-                                       }
+                                       transaction.commit();
 
+                                       // start a new worker thread and let it 
do the invocation.
+                                       InvokerWorker worker = new 
InvokerWorker(context,
+                                                       invokerBean);
+                                       threadPool.execute(worker);
 
-//                             }
+                               }
 
                        } catch (Exception e) {
                                if (transaction != null) {
@@ -240,20 +255,22 @@
                                                transaction.rollback();
                                                rolebacked = true;
                                        } catch (Exception e1) {
-                                               String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.rollbackError, e1
-                                                               .toString());
+                                               String message = 
SandeshaMessageHelper.getMessage(
+                                                               
SandeshaMessageKeys.rollbackError, e1
+                                                                               
.toString());
                                                log.debug(message, e1);
                                        }
                                }
-                               String message = 
SandeshaMessageHelper.getMessage(SandeshaMessageKeys.invokeMsgError);
+                               String message = SandeshaMessageHelper
+                                               
.getMessage(SandeshaMessageKeys.invokeMsgError);
                                log.debug(message, e);
                        } finally {
                                if (!rolebacked && transaction != null) {
                                        try {
                                                transaction.commit();
                                        } catch (Exception e) {
-                                               String message = 
SandeshaMessageHelper
-                                                               
.getMessage(SandeshaMessageKeys.commitError, e.toString());
+                                               String message = 
SandeshaMessageHelper.getMessage(
+                                                               
SandeshaMessageKeys.commitError, e.toString());
                                                log.debug(message, e);
                                        }
                                }

Modified: 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java
URL: 
http://svn.apache.org/viewvc/webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java?rev=433934&r1=433933&r2=433934&view=diff
==============================================================================
--- 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
(original)
+++ 
webservices/sandesha/trunk/java/src/org/apache/sandesha2/workers/Sender.java 
Tue Aug 22 23:28:48 2006
@@ -171,12 +171,9 @@
                                
                                transaction.commit();
                                
-                               
-                               
                                //start a worker which will work on this 
message.s
                                SenderWorker worker = new SenderWorker 
(context,senderBean);
                                threadPool.execute(worker);
-                               
 
                        } catch (Exception e) {
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to