Author: ruwan
Date: Thu Jan 10 22:52:15 2008
New Revision: 611068

URL: http://svn.apache.org/viewvc?rev=611068&view=rev
Log:
Fixing the timeout based completion of the aggregate mediator

Modified:
    
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
    
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java

Modified: 
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java?rev=611068&r1=611067&r2=611068&view=diff
==============================================================================
--- 
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
 (original)
+++ 
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
 Thu Jan 10 22:52:15 2008
@@ -27,11 +27,12 @@
 
 import java.util.List;
 import java.util.ArrayList;
+import java.util.TimerTask;
 
 /**
  * This holds the Aggregate properties and the list of messages which 
participate in the aggregation
  */
-public class Aggregate {
+public class Aggregate extends TimerTask {
 
     /**
      *
@@ -68,10 +69,12 @@
      */
     private String corelation = null;
 
+    private AggregateMediator mediator = null;
+
     /**
      *
      */
-    private List messages = new ArrayList();
+    private List<MessageContext> messages = new ArrayList<MessageContext>();
 
     /**
      * This is the constructor of the Aggregate which will set the timeout 
depending on the
@@ -81,8 +84,9 @@
      * @param timeout -
      * @param min -
      * @param max -
+     * @param mediator -
      */
-    public Aggregate(String corelation, long timeout, int min, int max) {
+    public Aggregate(String corelation, long timeout, int min, int max, 
AggregateMediator mediator) {
         this.corelation = corelation;
         if (timeout > 0) {
             this.timeout = System.currentTimeMillis() + expireTime;
@@ -93,6 +97,7 @@
         if (max > 0) {
             this.maxCount = max;
         }
+        this.mediator = mediator;
     }
 
     /**
@@ -175,7 +180,7 @@
         return messages;
     }
 
-    public void setMessages(List messages) {
+    public void setMessages(List<MessageContext> messages) {
         this.messages = messages;
     }
 
@@ -187,4 +192,7 @@
         this.expireTime = expireTime;
     }
 
+    public void run() {
+        mediator.completeAggregate(this);
+    }
 }

Modified: 
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java?rev=611068&r1=611067&r2=611068&view=diff
==============================================================================
--- 
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
 (original)
+++ 
webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
 Thu Jan 10 22:52:15 2008
@@ -116,7 +116,7 @@
     /**
      * This will hold the map of active aggregates at any given time
      */
-    private Map activeAggregates = new HashMap();
+    private Map<String, Aggregate> activeAggregates = new HashMap<String, 
Aggregate>();
 
     /**
      * This will hold the expired aggregates at any given time, these will be 
cleaned by a timer
@@ -162,7 +162,7 @@
             }
         }
 
-//        todo: revisit this         
+//        todo: revisit this
 //        if (!isTimerSet) {
 //            synCtx.getConfiguration().getSynapseTimer()
 //                    .schedule(new AggregateCollector(this), 5000);
@@ -186,7 +186,8 @@
                 } else {
                     aggregate = new 
Aggregate(this.corelateExpression.toString(),
                             this.completeTimeout, this.minMessagesToComplete,
-                            this.maxMessagesToComplete);
+                            this.maxMessagesToComplete, this);
+                    
synCtx.getConfiguration().getSynapseTimer().schedule(aggregate, 
completeTimeout);
                     activeAggregates.put(this.corelateExpression.toString(), 
aggregate);
                 }
 
@@ -224,7 +225,8 @@
 
                     } else {
                         aggregate = new Aggregate(corelation, 
this.completeTimeout,
-                                this.minMessagesToComplete, 
this.maxMessagesToComplete);
+                                this.minMessagesToComplete, 
this.maxMessagesToComplete, this);
+                        
synCtx.getConfiguration().getSynapseTimer().schedule(aggregate, 
completeTimeout);                             
                         activeAggregates.put(corelation, aggregate);
                     }
 
@@ -311,12 +313,12 @@
 
             if ((this.corelateExpression != null && !this.corelateExpression
                     .toString().equals(aggregate.getCorelation())) ||
-                    this.corelateExpression == null) {
+                this.corelateExpression == null) {
 
-//                            aggregate.setExpireTime(
-//                                    System.currentTimeMillis() + 
this.invlidateToDestroyTime);
+//                aggregate.setExpireTime(
+//                    System.currentTimeMillis() + 
this.invlidateToDestroyTime);
                 expiredAggregates.put(aggregate.getCorelation(),
-                        new Long(System.currentTimeMillis() + 
this.invlidateToDestroyTime));
+                    new Long(System.currentTimeMillis() + 
this.invlidateToDestroyTime));
 
                 if (this.onCompleteSequence != null) {
                     this.onCompleteSequence.mediate(newSynCtx);



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to