Author: ruwan Date: Thu Jan 10 22:52:15 2008 New Revision: 611068 URL: http://svn.apache.org/viewvc?rev=611068&view=rev Log: Fixing the timeout based completion of the aggregate mediator
Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java?rev=611068&r1=611067&r2=611068&view=diff ============================================================================== --- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java (original) +++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java Thu Jan 10 22:52:15 2008 @@ -27,11 +27,12 @@ import java.util.List; import java.util.ArrayList; +import java.util.TimerTask; /** * This holds the Aggregate properties and the list of messages which participate in the aggregation */ -public class Aggregate { +public class Aggregate extends TimerTask { /** * @@ -68,10 +69,12 @@ */ private String corelation = null; + private AggregateMediator mediator = null; + /** * */ - private List messages = new ArrayList(); + private List<MessageContext> messages = new ArrayList<MessageContext>(); /** * This is the constructor of the Aggregate which will set the timeout depending on the @@ -81,8 +84,9 @@ * @param timeout - * @param min - * @param max - + * @param mediator - */ - public Aggregate(String corelation, long timeout, int min, int max) { + public Aggregate(String corelation, long timeout, int min, int max, AggregateMediator mediator) { this.corelation = corelation; if (timeout > 0) { this.timeout = System.currentTimeMillis() + expireTime; @@ -93,6 +97,7 @@ if (max > 0) { this.maxCount = max; } + this.mediator = mediator; } /** @@ -175,7 +180,7 @@ return messages; } - public void setMessages(List messages) { + public void setMessages(List<MessageContext> messages) { this.messages = messages; } @@ -187,4 +192,7 @@ this.expireTime = expireTime; } + public void run() { + mediator.completeAggregate(this); + } } Modified: webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java URL: http://svn.apache.org/viewvc/webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java?rev=611068&r1=611067&r2=611068&view=diff ============================================================================== --- webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java (original) +++ webservices/synapse/branches/1.1.1/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java Thu Jan 10 22:52:15 2008 @@ -116,7 +116,7 @@ /** * This will hold the map of active aggregates at any given time */ - private Map activeAggregates = new HashMap(); + private Map<String, Aggregate> activeAggregates = new HashMap<String, Aggregate>(); /** * This will hold the expired aggregates at any given time, these will be cleaned by a timer @@ -162,7 +162,7 @@ } } -// todo: revisit this +// todo: revisit this // if (!isTimerSet) { // synCtx.getConfiguration().getSynapseTimer() // .schedule(new AggregateCollector(this), 5000); @@ -186,7 +186,8 @@ } else { aggregate = new Aggregate(this.corelateExpression.toString(), this.completeTimeout, this.minMessagesToComplete, - this.maxMessagesToComplete); + this.maxMessagesToComplete, this); + synCtx.getConfiguration().getSynapseTimer().schedule(aggregate, completeTimeout); activeAggregates.put(this.corelateExpression.toString(), aggregate); } @@ -224,7 +225,8 @@ } else { aggregate = new Aggregate(corelation, this.completeTimeout, - this.minMessagesToComplete, this.maxMessagesToComplete); + this.minMessagesToComplete, this.maxMessagesToComplete, this); + synCtx.getConfiguration().getSynapseTimer().schedule(aggregate, completeTimeout); activeAggregates.put(corelation, aggregate); } @@ -311,12 +313,12 @@ if ((this.corelateExpression != null && !this.corelateExpression .toString().equals(aggregate.getCorelation())) || - this.corelateExpression == null) { + this.corelateExpression == null) { -// aggregate.setExpireTime( -// System.currentTimeMillis() + this.invlidateToDestroyTime); +// aggregate.setExpireTime( +// System.currentTimeMillis() + this.invlidateToDestroyTime); expiredAggregates.put(aggregate.getCorelation(), - new Long(System.currentTimeMillis() + this.invlidateToDestroyTime)); + new Long(System.currentTimeMillis() + this.invlidateToDestroyTime)); if (this.onCompleteSequence != null) { this.onCompleteSequence.mediate(newSynCtx); --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]