Author: ruwan
Date: Mon Oct 29 19:34:10 2007
New Revision: 589942
URL: http://svn.apache.org/viewvc?rev=589942&view=rev
Log:
Adding the Aggregate Mediator - :)
Added:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
(with props)
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
(with props)
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
(with props)
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
(with props)
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java
Added:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java?rev=589942&view=auto
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
(added)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
Mon Oct 29 19:34:10 2007
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.synapse.Mediator;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.mediators.eip.aggregator.AggregateMediator;
+import org.apache.synapse.mediators.builtin.DropMediator;
+import org.apache.synapse.mediators.base.SequenceMediator;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.xpath.AXIOMXPath;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jaxen.JaxenException;
+
+import javax.xml.namespace.QName;
+
+/**
+ * <aggregate>
+ * <corelateOn expression="XPATH-expression"/>
+ * <completeCondition timeout="time-in-seconds">
+ * <messageCount min="int-min" max="int-max"/>
+ * </completeCondition>
+ * <onComplete expression="XPATH-expression" sequence="sequence-ref">
+ * (mediator +)?
+ * </onComplete>
+ * <invalidate sequence="sequence-ref" timeout="time-in-seconds">
+ * (mediator +)?
+ * </invalidate>
+ * </aggregate>
+ */
+public class AggregateMediatorFactory extends AbstractMediatorFactory {
+
+ private static final Log log =
LogFactory.getLog(AggregateMediatorFactory.class);
+
+ private static final QName AGGREGATE_Q = new
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "aggregate");
+ private static final QName CORELATE_ON_Q = new
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "corelateOn");
+ private static final QName COMPLETE_CONDITION_Q
+ = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE,
"completeCondition");
+ private static final QName MESSAGE_COUNT_Q
+ = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "messageCount");
+ private static final QName ON_COMPLETE_Q = new
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "onComplete");
+ private static final QName INVALIDATE_Q = new
QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "invalidate");
+
+ private static final QName TIME_TO_LIVE_Q = new
QName(XMLConfigConstants.NULL_NAMESPACE, "timeToLive");
+ private static final QName EXPRESSION_Q = new
QName(XMLConfigConstants.NULL_NAMESPACE, "expression");
+ private static final QName TIMEOUT_Q = new
QName(XMLConfigConstants.NULL_NAMESPACE, "timeout");
+ private static final QName MIN_Q = new
QName(XMLConfigConstants.NULL_NAMESPACE, "min");
+ private static final QName MAX_Q = new
QName(XMLConfigConstants.NULL_NAMESPACE, "max");
+ private static final QName TYPE_Q = new
QName(XMLConfigConstants.NULL_NAMESPACE, "type");
+ private static final QName SEQUENCE_Q = new
QName(XMLConfigConstants.NULL_NAMESPACE, "sequence");
+
+ public Mediator createMediator(OMElement elem) {
+
+ AggregateMediator mediator = new AggregateMediator();
+ processTraceState(mediator, elem);
+ // todo: need to fix
+ OMAttribute timeToLive = elem.getAttribute(TIME_TO_LIVE_Q);
+ if (timeToLive != null) {
+
mediator.setTimeToInvalidate(Long.parseLong(timeToLive.getAttributeValue()) *
1000);
+ }
+
+ OMElement corelateOn = elem.getFirstChildWithName(CORELATE_ON_Q);
+ if (corelateOn != null) {
+ OMAttribute corelateExpr = corelateOn.getAttribute(EXPRESSION_Q);
+ if (corelateExpr != null) {
+ try {
+ AXIOMXPath xp = new
AXIOMXPath(corelateExpr.getAttributeValue());
+ OMElementUtils.addNameSpaces(xp, corelateOn, log);
+ mediator.setCorelateExpression(xp);
+ } catch (JaxenException e) {
+ handleException("Unable to load the corelate XPATH
expression", e);
+ }
+ }
+ }
+
+ OMElement completeCond =
elem.getFirstChildWithName(COMPLETE_CONDITION_Q);
+ if (completeCond != null) {
+ OMAttribute completeTimeout = completeCond.getAttribute(TIMEOUT_Q);
+ if (completeTimeout != null) {
+ mediator.setCompleteTimeout(
+ Long.parseLong(completeTimeout.getAttributeValue()) *
1000);
+ }
+
+ OMElement messageCount =
completeCond.getFirstChildWithName(MESSAGE_COUNT_Q);
+ if (messageCount != null) {
+ OMAttribute min = messageCount.getAttribute(MIN_Q);
+ if (min != null) {
+
mediator.setMinMessagesToComplete(Integer.parseInt(min.getAttributeValue()));
+ }
+
+ OMAttribute max = messageCount.getAttribute(MAX_Q);
+ if (max != null) {
+
mediator.setMaxMessagesToComplete(Integer.parseInt(max.getAttributeValue()));
+ }
+ }
+ }
+
+ OMElement invalidate = elem.getFirstChildWithName(INVALIDATE_Q);
+ if (invalidate != null) {
+ OMAttribute sequenceRef = invalidate.getAttribute(SEQUENCE_Q);
+ if (sequenceRef != null) {
+
mediator.setInvalidMsgSequenceRef(sequenceRef.getAttributeValue());
+ } else if (invalidate.getFirstElement() != null) {
+ mediator.setInvalidMsgSequence(
+ (new
SequenceMediatorFactory()).createAnonymousSequence(invalidate));
+ }
+
+ OMAttribute timeout = invalidate.getAttribute(TIMEOUT_Q);
+ if (timeout != null) {
+
mediator.setInvlidateToDestroyTime(Long.parseLong(timeout.getAttributeValue()));
+ } else {
+ mediator.setInvlidateToDestroyTime(300);
+ }
+ }
+
+ OMElement onComplete = elem.getFirstChildWithName(ON_COMPLETE_Q);
+ if (onComplete != null) {
+
+ OMAttribute aggregateExpr = onComplete.getAttribute(EXPRESSION_Q);
+ if (aggregateExpr != null) {
+ try {
+ AXIOMXPath xp = new
AXIOMXPath(aggregateExpr.getAttributeValue());
+ OMElementUtils.addNameSpaces(xp, onComplete, log);
+ mediator.setAggregationExpression(xp);
+ } catch (JaxenException e) {
+ handleException("Unable to load the aggregating XPATH", e);
+ }
+ }
+
+ OMAttribute onCompleteSequence =
onComplete.getAttribute(SEQUENCE_Q);
+ if (onCompleteSequence != null) {
+
mediator.setOnCompleteSequenceRef(onCompleteSequence.getAttributeValue());
+ } else if (onComplete.getFirstElement() != null) {
+ mediator.setOnCompleteSequence(
+ (new
SequenceMediatorFactory()).createAnonymousSequence(onComplete));
+ } else {
+ SequenceMediator sequence = new SequenceMediator();
+ sequence.addChild(new DropMediator());
+ mediator.setOnCompleteSequence(sequence);
+ }
+ }
+ return mediator;
+ }
+
+ public QName getTagQName() {
+ return AGGREGATE_Q;
+ }
+}
Propchange:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorFactory.java
------------------------------------------------------------------------------
svn:executable = *
Added:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java?rev=589942&view=auto
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
(added)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
Mon Oct 29 19:34:10 2007
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.mediators.eip.aggregator.AggregateMediator;
+import org.apache.synapse.mediators.ext.ClassMediator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * <aggregate>
+ * <corelateOn expression="XPATH-expression"/>
+ * <completeCondition timeout="time-in-seconds">
+ * <messageCount min="int-min" max="int-max"/>
+ * </completeCondition>
+ * <onComplete expression="XPATH-expression" sequence="sequence-ref">
+ * (mediator +)?
+ * </onComplete>
+ * <invalidate sequence="sequence-ref" timeout="time-in-seconds">
+ * (mediator +)?
+ * </invalidate>
+ * </aggregate>
+ */
+public class AggregateMediatorSerializer extends AbstractMediatorSerializer {
+
+ private static final Log log =
LogFactory.getLog(AggregateMediatorSerializer.class);
+
+ public OMElement serializeMediator(OMElement parent, Mediator m) {
+
+ if (!(m instanceof AggregateMediator)) {
+ handleException("Unsupported mediator passed in for serialization
: " + m.getType());
+ }
+ AggregateMediator mediator = (AggregateMediator) m;
+ OMElement aggregator = fac.createOMElement("aggregate", synNS);
+ saveTracingState(aggregator, mediator);
+
+ if (mediator.getCorelateExpression() != null) {
+ OMElement corelateOn = fac.createOMElement("corelateOn", synNS);
+ corelateOn.addAttribute("expression",
mediator.getCorelateExpression().toString(), nullNS);
+ super.serializeNamespaces(corelateOn,
mediator.getCorelateExpression());
+ aggregator.addChild(corelateOn);
+ }
+
+ OMElement completeCond = fac.createOMElement("completeCondition",
synNS);
+ if (mediator.getCompleteTimeout() != 0) {
+ completeCond.addAttribute("timeout", "" +
mediator.getCompleteTimeout(), nullNS);
+ }
+ OMElement messageCount = fac.createOMElement("messageCount", synNS);
+ if (mediator.getMinMessagesToComplete() != 0) {
+ messageCount.addAttribute("min", "" +
mediator.getMinMessagesToComplete(), nullNS);
+ }
+ if (mediator.getMaxMessagesToComplete() != 0) {
+ messageCount.addAttribute("max", "" +
mediator.getMaxMessagesToComplete(), nullNS);
+ }
+ completeCond.addChild(messageCount);
+ aggregator.addChild(completeCond);
+
+ OMElement aggregatorElem = fac.createOMElement("aggregator", synNS);
+// aggregatorElem.addAttribute("type",
mediator.getAggregator().getClass().getName(), nullNS);
+// aggregatorElem.addAttribute("expression", mediator.get)
+
+ return aggregator;
+ }
+
+ public String getMediatorClassName() {
+ return AggregateMediator.class.getName();
+ }
+}
Propchange:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/AggregateMediatorSerializer.java
------------------------------------------------------------------------------
svn:executable = *
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java?rev=589942&r1=589941&r2=589942&view=diff
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
(original)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
Mon Oct 29 19:34:10 2007
@@ -65,6 +65,7 @@
POJOCommandMediatorFactory.class,
CloneMediatorFactory.class,
IterateMediatorFactory.class,
+ AggregateMediatorFactory.class,
DBReportMediatorFactory.class,
DBLookupMediatorFactory.class,
CacheMediatorFactory.class
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java?rev=589942&r1=589941&r2=589942&view=diff
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java
(original)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java
Mon Oct 29 19:34:10 2007
@@ -53,6 +53,7 @@
POJOCommandMediatorSerializer.class,
CloneMediatorSerializer.class,
IterateMediatorSerializer.class,
+ AggregateMediatorSerializer.class,
DBLookupMediatorSerializer.class,
DBReportMediatorSerializer.class,
CacheMediatorSerializer.class
Added:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java?rev=589942&view=auto
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
(added)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
Mon Oct 29 19:34:10 2007
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.mediators.eip.aggregator;
+
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.mediators.eip.EIPConstants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * This holds the Aggregate properties and the list of messages which
participate in the aggregation
+ */
+public class Aggregate {
+
+ /**
+ *
+ */
+ private static final Log log = LogFactory.getLog(Aggregate.class);
+
+ /**
+ *
+ */
+ private static final Log trace =
LogFactory.getLog(SynapseConstants.TRACE_LOGGER);
+
+ /**
+ *
+ */
+ private long timeout = 0;
+
+ /**
+ *
+ */
+ private long expireTime = 0;
+
+ /**
+ *
+ */
+ private int minCount = -1;
+
+ /**
+ *
+ */
+ private int maxCount = -1;
+
+ /**
+ *
+ */
+ private String corelation = null;
+
+ /**
+ *
+ */
+ private List messages = new ArrayList();
+
+ /**
+ * This is the constructor of the Aggregate which will set the timeout
depending on the
+ * timeout for the aggregate
+ *
+ * @param corelation - String representing the corelation name of the
messages in the aggregate
+ * @param timeout -
+ * @param min -
+ * @param max -
+ */
+ public Aggregate(String corelation, long timeout, int min, int max) {
+ this.corelation = corelation;
+ if (timeout > 0) {
+ this.timeout = System.currentTimeMillis() + expireTime;
+ }
+ if (min > 0) {
+ this.minCount = min;
+ }
+ if (max > 0) {
+ this.maxCount = max;
+ }
+ }
+
+ /**
+ * @param synCtx -
+ * @return true if the message was added and false if not
+ */
+ public boolean addMessage(MessageContext synCtx) {
+ if (this.maxCount > 0 && this.messages.size() < this.maxCount ||
this.maxCount <= 0) {
+ this.messages.add(synCtx);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * @return boolean stating the completeness of the corelation
+ */
+ public boolean isComplete() {
+
+ boolean completed = false;
+ if (!messages.isEmpty()) {
+
+ Object o = messages.get(0);
+ if (o instanceof MessageContext) {
+
+ Object prop = ((MessageContext)
o).getProperty(EIPConstants.MESSAGE_SEQUENCE);
+ if (prop instanceof String) {
+
+ String[] msgSequence
+ =
prop.toString().split(EIPConstants.MESSAGE_SEQUENCE_DELEMITER);
+ if (messages.size() >= Integer.parseInt(msgSequence[1])) {
+ completed = true;
+ }
+ }
+ }
+ }
+
+ if (!completed && this.minCount > 0) {
+ completed = this.messages.size() >= this.minCount
+ || this.timeout < System.currentTimeMillis();
+ }
+
+ return completed;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ public int getMinCount() {
+ return minCount;
+ }
+
+ public void setMinCount(int minCount) {
+ this.minCount = minCount;
+ }
+
+ public int getMaxCount() {
+ return maxCount;
+ }
+
+ public void setMaxCount(int maxCount) {
+ this.maxCount = maxCount;
+ }
+
+ public String getCorelation() {
+ return corelation;
+ }
+
+ public void setCorelation(String corelation) {
+ this.corelation = corelation;
+ }
+
+ public List getMessages() {
+ return messages;
+ }
+
+ public void setMessages(List messages) {
+ this.messages = messages;
+ }
+
+ public long getExpireTime() {
+ return expireTime;
+ }
+
+ public void setExpireTime(long expireTime) {
+ this.expireTime = expireTime;
+ }
+
+}
Propchange:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/Aggregate.java
------------------------------------------------------------------------------
svn:executable = *
Added:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java?rev=589942&view=auto
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
(added)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
Mon Oct 29 19:34:10 2007
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.mediators.eip.aggregator;
+
+import org.apache.axiom.om.xpath.AXIOMXPath;
+import org.apache.axiom.soap.SOAP11Constants;
+import org.apache.axiom.soap.SOAP12Constants;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.mediators.AbstractMediator;
+import org.apache.synapse.mediators.eip.EIPUtils;
+import org.apache.synapse.mediators.eip.EIPConstants;
+import org.apache.synapse.mediators.base.SequenceMediator;
+import org.jaxen.JaxenException;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * This mediator will aggregate the messages flowing in to this with the
specified message types
+ * and build a one message
+ */
+public class AggregateMediator extends AbstractMediator {
+
+ private static final Log log = LogFactory.getLog(AggregateMediator.class);
+
+ private static final Log trace =
LogFactory.getLog(SynapseConstants.TRACE_LOGGER);
+
+ /**
+ * This will hold the maximum lifetime of an aggregate and if a particular
aggregate does not
+ * completed before its life time it will be invalidated and taken off
from the activeAggregates
+ * map and put in to the expiredAggregates map and the invalidate sequence
will be called to
+ * mediate the messages in the expired aggregate if there are any
+ */
+ private long timeToInvalidate = 0;
+
+ /**
+ * Messages comming to the aggregator will be examined for the existance
of a node described
+ * in this XPATH and if it contains the XPATH pick that, if not try to
find the messageSequence
+ * property for the corelation and if not pass the message through
+ */
+ private AXIOMXPath corelateExpression = null;
+
+ /**
+ * This will be used in the complete condition to complete the aggregation
after waiting a
+ * specified timeout and send the messages gatherd in the aggregate after
aggregation
+ * if there are any messages
+ */
+ private long completeTimeout = 0;
+
+ /**
+ * Minimum number of messages required to evaluate the complete condition
to true unless the
+ * aggregate has timedout with the provided timeout if there is a one
+ */
+ private int minMessagesToComplete = -1;
+
+ /**
+ * Maximum number of messages that can be contained in a particular
aggregation
+ */
+ private int maxMessagesToComplete = -1;
+
+ /**
+ * This will hold the implementation of the aggregation algorithm and upon
validating the
+ * complete condition getAggregatedMessage method of the aggregator will
be called to get
+ * the aggregated message
+ */
+ private AXIOMXPath aggregationExpression = null;
+
+ /**
+ * Holds a String reference to the Named Sequence which will be called to
mediate the invalid
+ * messages coming in to the aggregator
+ */
+ private String invalidMsgSequenceRef = null;
+
+ /**
+ * Sequece which will be called to mediate the invalid messages comming in
to aggregator
+ */
+ private SequenceMediator invalidMsgSequence = null;
+
+ /**
+ * This will be used to destroy the aggreagtes which were kept in the
expiredAggregates map
+ */
+ private long invlidateToDestroyTime = 0;
+
+ /**
+ * This holds the reference sequence name of the
+ */
+ private String onCompleteSequenceRef = null;
+
+ /**
+ *
+ */
+ private SequenceMediator onCompleteSequence = null;
+
+ /**
+ * This will hold the map of active aggragates at any given time
+ */
+ private Map activeAggregates = new HashMap();
+
+ /**
+ * This will hold the expired aggregates at any given time, these will be
cleaned by a timer
+ * task time to time in order to ensure uncontroled growth
+ */
+ private Map expiredAggregates = new HashMap();
+
+ private boolean isTimerSet = false;
+
+ public AggregateMediator() {
+ try {
+ aggregationExpression = new
AXIOMXPath("s11:Body/child::*[position()=1] | " +
+ "s12:Body/child::*[position()=1]");
+ aggregationExpression.addNamespace("s11",
SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+ aggregationExpression.addNamespace("s12",
SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+ } catch (JaxenException e) {
+ if (log.isDebugEnabled()) {
+ handleException("Unable to set the default " +
+ "aggregationExpression for the aggregation", e, null);
+ }
+ }
+ }
+
+ /**
+ * This is the mediate method implementation of the AggregateMediator. And
this will aggregate
+ * the messages going through this mediator according to the corelation
criteria and the
+ * aggregation algorith specified to it
+ *
+ * @param synCtx - MessageContext to be mediated and aggregated
+ * @return boolean true if the complete condition for the particular
aggregate is validated
+ * false if not
+ */
+ public boolean mediate(MessageContext synCtx) {
+ // tracing and debuggin related mediation initiation
+ boolean traceOn = isTraceOn(synCtx);
+ boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
+
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "Start : Aggregate mediator");
+
+ if (traceOn && trace.isTraceEnabled()) {
+ trace.trace("Message : " + synCtx.getEnvelope());
+ }
+ }
+
+// todo: revisit this
+// if (!isTimerSet) {
+// synCtx.getConfiguration().getSynapseTimer()
+// .schedule(new AggregateCollector(this), 5000);
+// }
+
+ try {
+ Aggregate aggregate = null;
+
+ // if the corelate aggregationExpression is provided and there is
a coresponding
+ // element in the message corelate the messages on that
+ if (this.corelateExpression != null
+ && this.corelateExpression.evaluate(synCtx.getEnvelope())
!= null) {
+
+ if
(activeAggregates.containsKey(this.corelateExpression.toString())) {
+ Object o =
activeAggregates.get(this.corelateExpression.toString());
+ if (o instanceof Aggregate) {
+ aggregate = (Aggregate) o;
+ } else {
+ handleException("Undefined aggregate type.", synCtx);
+ }
+ } else {
+ aggregate = new
Aggregate(this.corelateExpression.toString(),
+ this.completeTimeout, this.minMessagesToComplete,
+ this.maxMessagesToComplete);
+ activeAggregates.put(this.corelateExpression.toString(),
aggregate);
+ }
+
+ // if the corelattion can not be found using the
aggregationExpression try to find the
+ // corelation on the default criteria which is through the
aggregate corelation
+ // property of the message
+ } else if (synCtx.getProperty(EIPConstants.AGGREGATE_CORELATION)
!= null) {
+
+ String corelation = synCtx.getProperty(
+ EIPConstants.AGGREGATE_CORELATION) instanceof String ?
synCtx.getProperty(
+ EIPConstants.AGGREGATE_CORELATION).toString() : null;
+
+ // check whether the message corelation name is in the expired
aggregates
+ if (expiredAggregates.containsKey(corelation)) {
+
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "Message with the corelation "
+ + corelation + " expired. Invalidating the
message.");
+ }
+
+ invalidate(synCtx, traceOrDebugOn, traceOn);
+ return false;
+ }
+
+ if (corelation != null) {
+
+ if (activeAggregates.containsKey(corelation)) {
+
+ Object o = activeAggregates.get(corelation);
+ if (o instanceof Aggregate) {
+ aggregate = (Aggregate) o;
+ } else {
+ handleException("Undefined aggregate type.",
synCtx);
+ }
+
+ } else {
+ aggregate = new Aggregate(corelation,
this.completeTimeout,
+ this.minMessagesToComplete,
this.maxMessagesToComplete);
+ activeAggregates.put(corelation, aggregate);
+ }
+
+ } else {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn,
+ "Error in getting corelation details. Skip the
aggregator.");
+ }
+ return true;
+ }
+ } else {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn,
+ "Unable to find the aggregation corelation. Skip the
aggregation");
+ }
+ return true;
+ }
+
+ // if there is an aggregate continue on aggregation
+ if (aggregate != null) {
+
+ // add the message to the aggregate and if the maximum count
of the aggregate is
+ // exceeded invalidate the message
+ if (!aggregate.addMessage(synCtx)) {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "Can not exceed aggregate " +
+ "max message count. Invalidating message");
+ }
+ invalidate(synCtx, traceOrDebugOn, traceOn);
+ return false;
+ }
+
+ // check the completeness of the aggregate and is completed
aggregate the messages
+ // if not completed return false and block the message
sequence till it completes
+ if (aggregate.isComplete()) {
+ return completeAggregate(aggregate);
+ }
+
+ // if the aggregation corelation can not be found then continue
the message on the
+ // normal path by returning true
+ } else {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "Unable to find the aggregate. Skip
the aggregation");
+ }
+ return true;
+ }
+
+ } catch (JaxenException e) {
+ handleException("Unable to execute the XPATH over the message", e,
synCtx);
+ }
+
+ // finalize tracing and debugging
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "End : Aggregate mediator");
+ }
+
+ return false;
+ }
+
+ private void invalidate(MessageContext synCtx, boolean traceOrDebugOn,
boolean traceOn) {
+
+ if (this.invalidMsgSequenceRef != null && synCtx.getConfiguration()
+ .getSequence(invalidMsgSequenceRef) != null) {
+
+ // use the sequence reference to get the sequence for mediation
+
synCtx.getConfiguration().getSequence(invalidMsgSequenceRef).mediate(synCtx);
+
+ } else if (this.invalidMsgSequence != null) {
+
+ // use the sequence to mediate the invalidated messages
+ invalidMsgSequence.mediate(synCtx);
+
+ } else {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "No invalid message sequence defined.
Dropping the message");
+ }
+ }
+ }
+
+ public boolean completeAggregate(Aggregate aggregate) {
+
+ MessageContext newSynCtx = getAggregatedMessage(aggregate);
+ activeAggregates.remove(aggregate.getCorelation());
+
+ if ((this.corelateExpression != null && !this.corelateExpression
+ .toString().equals(aggregate.getCorelation())) ||
+ this.corelateExpression == null) {
+
+// aggregate.setExpireTime(
+// System.currentTimeMillis() +
this.invlidateToDestroyTime);
+ expiredAggregates.put(aggregate.getCorelation(),
+ new Long(System.currentTimeMillis() +
this.invlidateToDestroyTime));
+
+ if (this.onCompleteSequence != null) {
+ this.onCompleteSequence.mediate(newSynCtx);
+ } else if (this.onCompleteSequenceRef != null
+ && newSynCtx.getSequence(this.onCompleteSequenceRef)
!= null) {
+
newSynCtx.getSequence(this.onCompleteSequenceRef).mediate(newSynCtx);
+ } else {
+ handleException("Unable to find the sequence for the
mediation " +
+ "of the aggregated message", newSynCtx);
+ }
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ public MessageContext getAggregatedMessage(Aggregate aggregate) {
+ MessageContext newCtx = null;
+ Iterator itr = aggregate.getMessages().iterator();
+ while (itr.hasNext()) {
+ Object o = itr.next();
+ if (o instanceof MessageContext) {
+ MessageContext synCtx = (MessageContext) o;
+ if (newCtx == null) {
+ newCtx = synCtx;
+ } else {
+ try {
+ EIPUtils.enrichEnvelope(
+ newCtx.getEnvelope(), synCtx.getEnvelope(),
this.aggregationExpression);
+ } catch (JaxenException e) {
+ handleException("Unable to get the aggreagated
message", e, synCtx);
+ }
+ }
+ }
+ }
+ return newCtx;
+ }
+
+ public AXIOMXPath getCorelateExpression() {
+ return corelateExpression;
+ }
+
+ public void setCorelateExpression(AXIOMXPath corelateExpression) {
+ this.corelateExpression = corelateExpression;
+ }
+
+ public String getInvalidMsgSequenceRef() {
+ return invalidMsgSequenceRef;
+ }
+
+ public void setInvalidMsgSequenceRef(String invalidMsgSequenceRef) {
+ this.invalidMsgSequenceRef = invalidMsgSequenceRef;
+ }
+
+ public SequenceMediator getInvalidMsgSequence() {
+ return invalidMsgSequence;
+ }
+
+ public void setInvalidMsgSequence(SequenceMediator invalidMsgSequence) {
+ this.invalidMsgSequence = invalidMsgSequence;
+ }
+
+ public long getTimeToInvalidate() {
+ return timeToInvalidate;
+ }
+
+ public void setTimeToInvalidate(long timeToInvalidate) {
+ this.timeToInvalidate = timeToInvalidate;
+ }
+
+ public long getCompleteTimeout() {
+ return completeTimeout;
+ }
+
+ public void setCompleteTimeout(long completeTimeout) {
+ this.completeTimeout = completeTimeout;
+ }
+
+ public int getMinMessagesToComplete() {
+ return minMessagesToComplete;
+ }
+
+ public void setMinMessagesToComplete(int minMessagesToComplete) {
+ this.minMessagesToComplete = minMessagesToComplete;
+ }
+
+ public int getMaxMessagesToComplete() {
+ return maxMessagesToComplete;
+ }
+
+ public void setMaxMessagesToComplete(int maxMessagesToComplete) {
+ this.maxMessagesToComplete = maxMessagesToComplete;
+ }
+
+ public AXIOMXPath getAggregationExpression() {
+ return aggregationExpression;
+ }
+
+ public void setAggregationExpression(AXIOMXPath aggregationExpression) {
+ this.aggregationExpression = aggregationExpression;
+ }
+
+ public long getInvlidateToDestroyTime() {
+ return invlidateToDestroyTime;
+ }
+
+ public void setInvlidateToDestroyTime(long invlidateToDestroyTime) {
+ this.invlidateToDestroyTime = invlidateToDestroyTime;
+ }
+
+ public String getOnCompleteSequenceRef() {
+ return onCompleteSequenceRef;
+ }
+
+ public void setOnCompleteSequenceRef(String onCompleteSequenceRef) {
+ this.onCompleteSequenceRef = onCompleteSequenceRef;
+ }
+
+ public SequenceMediator getOnCompleteSequence() {
+ return onCompleteSequence;
+ }
+
+ public void setOnCompleteSequence(SequenceMediator onCompleteSequence) {
+ this.onCompleteSequence = onCompleteSequence;
+ }
+
+ public Map getExpiredAggregates() {
+ return expiredAggregates;
+ }
+
+ public Map getActiveAggregates() {
+ return activeAggregates;
+ }
+}
Propchange:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/aggregator/AggregateMediator.java
------------------------------------------------------------------------------
svn:executable = *
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]