Author: ruwan
Date: Mon Nov 26 03:02:10 2007
New Revision: 598222
URL: http://svn.apache.org/viewvc?rev=598222&view=rev
Log:
Fine tuning the caching to work on clustered env with performance improvements
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorFactory.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorSerializer.java
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorFactory.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorFactory.java?rev=598222&r1=598221&r2=598222&view=diff
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorFactory.java
(original)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorFactory.java
Mon Nov 26 03:02:10 2007
@@ -46,6 +46,7 @@
private static final QName ATT_ID = new QName("id");
private static final QName ATT_COLLECTOR = new QName("collector");
private static final QName ATT_HASH_GENERATOR = new QName("hashGenerator");
+ private static final QName ATT_MAX_MSG_SIZE = new QName("maxMessageSize");
private static final QName ATT_TIMEOUT = new QName("timeout");
private static final QName ATT_SCOPE = new QName("scope");
private static final QName ATT_SEQUENCE = new QName("sequence");
@@ -74,7 +75,7 @@
OMAttribute scopeAttr = elem.getAttribute(ATT_SCOPE);
if (scopeAttr != null && scopeAttr.getAttributeValue() != null &&
- isValidScope(scopeAttr.getAttributeValue())) {
+ isValidScope(scopeAttr.getAttributeValue(), cache.getId())) {
cache.setScope(scopeAttr.getAttributeValue());
} else {
cache.setScope(CachingConstants.SCOPE_PER_HOST);
@@ -117,6 +118,11 @@
cache.setTimeout(DEFAULT_TIMEOUT);
}
+ OMAttribute maxMessageSizeAttr =
elem.getAttribute(ATT_MAX_MSG_SIZE);
+ if (maxMessageSizeAttr != null &&
maxMessageSizeAttr.getAttributeValue() != null) {
+
cache.setMaxMessageSize(Integer.parseInt(maxMessageSizeAttr.getAttributeValue()));
+ }
+
OMElement onCacheHitElem =
elem.getFirstChildWithName(ON_CACHE_HIT_Q);
if (onCacheHitElem != null) {
OMAttribute sequenceAttr =
onCacheHitElem.getAttribute(ATT_SEQUENCE);
@@ -154,11 +160,16 @@
return cache;
}
- private boolean isValidScope(String scope) {
+ private boolean isValidScope(String scope, String id) {
if (CachingConstants.SCOPE_PER_HOST.equals(scope)) {
return true;
} else if (CachingConstants.SCOPE_PER_MEDIATOR.equals(scope)) {
- return true;
+ if (id != null) {
+ return true;
+ } else {
+ handleException("Id is required for a cache wirth scope : " +
scope);
+ return false;
+ }
} else if (CachingConstants.SCOPE_DISTRIBUTED.equals(scope)) {
handleException("Scope distributed is not supported yet by the
Cache mediator");
return false;
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorSerializer.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorSerializer.java?rev=598222&r1=598221&r2=598222&view=diff
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorSerializer.java
(original)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/CacheMediatorSerializer.java
Mon Nov 26 03:02:10 2007
@@ -69,6 +69,11 @@
fac.createOMAttribute("timeout", nullNS,
Long.toString(mediator.getTimeout())));
}
+ if (mediator.getMaxMessageSize() != 0) {
+ cache.addAttribute(
+ fac.createOMAttribute("maxMessageSize", nullNS,
Integer.toString(mediator.getMaxMessageSize())));
+ }
+
if (mediator.getOnCacheHitRef() != null) {
OMElement onCacheHit = fac.createOMElement("onCacheHit",
synNS);
onCacheHit.addAttribute(
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java?rev=598222&r1=598221&r2=598222&view=diff
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
(original)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
Mon Nov 26 03:02:10 2007
@@ -25,10 +25,12 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.saaj.util.SAAJUtil;
import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.core.axis2.Axis2Sender;
import org.apache.synapse.mediators.AbstractMediator;
import org.apache.synapse.mediators.base.SequenceMediator;
+import org.apache.synapse.util.FixedByteArrayOutputStream;
import org.apache.synapse.util.MessageHelper;
import org.wso2.caching.Cache;
import org.wso2.caching.CachedObject;
@@ -60,6 +62,7 @@
private long timeout = 0L;
private SequenceMediator onCacheHitSequence = null;
private String onCacheHitRef = null;
+ private int maxMessageSize = 0;
private String cacheObjKey = CachingConstants.CACHE_OBJECT; // default
per-host
private static final String CACHE_OBJ_PREFIX = "synapse.cache_obj_";
@@ -77,6 +80,23 @@
}
}
+ // if maxMessageSize is specified check for the message size before
processing
+ FixedByteArrayOutputStream fbaos = null;
+ if (maxMessageSize > 0) {
+ fbaos = new FixedByteArrayOutputStream(maxMessageSize);
+ try {
+
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(fbaos);
+ } catch (XMLStreamException e) {
+ handleException("Error in checking the message size", e,
synCtx);
+ } catch (SynapseException syne) {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "Message size exceeds the upper
bound for caching, " +
+ "request will not be cached");
+ return true;
+ }
+ }
+ }
+
ConfigurationContext cfgCtx =
((Axis2MessageContext)
synCtx).getAxis2MessageContext().getConfigurationContext();
if (cfgCtx == null) {
@@ -91,7 +111,7 @@
}
// look up cache
- Object prop = cfgCtx.getProperty(cacheObjKey);
+ Object prop = cfgCtx.getPropertyNonReplicable(cacheObjKey);
Cache cache;
if (prop != null && prop instanceof Cache) {
cache = (Cache) prop;
@@ -99,7 +119,7 @@
} else {
synchronized (cfgCtx) {
// check again after taking the lock to make sure no one else
did it before us
- prop = cfgCtx.getProperty(cacheObjKey);
+ prop = cfgCtx.getPropertyNonReplicable(cacheObjKey);
if (prop != null && prop instanceof Cache) {
cache = (Cache) prop;
@@ -115,10 +135,10 @@
boolean result = true;
if (synCtx.isResponse()) {
- processResponseMessage(synCtx, traceOrDebugOn, traceOn, cache);
+ processResponseMessage(synCtx, cfgCtx, traceOrDebugOn, traceOn,
cache);
} else {
- result = processRequestMessage(synCtx, traceOrDebugOn, traceOn,
cache);
+ result = processRequestMessage(synCtx, cfgCtx, traceOrDebugOn,
traceOn, cache, fbaos);
}
try {
@@ -142,9 +162,10 @@
* @param traceOrDebugOn is trace or debug logging on?
* @param traceOn is tracing on?
* @param synCtx the current message (response)
+ * @param cfgCtx the abstract context in which the cache will be
kept
* @param cache the cache
*/
- private void processResponseMessage(MessageContext synCtx, boolean
traceOrDebugOn,
+ private void processResponseMessage(MessageContext synCtx,
ConfigurationContext cfgCtx, boolean traceOrDebugOn,
boolean traceOn, Cache cache) {
if (!collector) {
@@ -178,15 +199,17 @@
handleException("Unable to set the response to the Cache",
e, synCtx);
}
- // this is not required yet can commented this for perf
improvements
- // in the future there can be a situation where user sends the
request with the
- // response hash (if client side caching is on) in which case
we can compare that
- // response hash with the given response hash and respond with
not-modified http header
- cachedObj.setResponseHash(cache.getGenerator().getDigest(
- ((Axis2MessageContext) synCtx).getAxis2MessageContext()));
+ /* this is not required yet, can commented this for perf
improvements
+ in the future there can be a situation where user sends the
request with the
+ response hash (if client side caching is on) in which case
we can compare that
+ response hash with the given response hash and respond with
not-modified http header */
+ // cachedObj.setResponseHash(cache.getGenerator().getDigest(
+ // ((Axis2MessageContext)
synCtx).getAxis2MessageContext()));
cachedObj.setExpireTime(System.currentTimeMillis() +
cachedObj.getTimeout());
+ cfgCtx.setProperty(cacheObjKey, cache);
+
} else {
auditWarn("A response message without a valid mapping to the "
+
"request hash found. Unable to store the response in
cache", synCtx);
@@ -204,13 +227,15 @@
* this message as a response and sends back directly to client.
*
* @param synCtx incoming request message
+ * @param cfgCtx the AbstractContext in which the cache will be
kept
* @param traceOrDebugOn is tracing or debug logging on?
* @param traceOn is tracing on?
* @param cache the cache
+ * @param fbaos the serialized request envelope
* @return should this mediator terminate further processing?
*/
- private boolean processRequestMessage(MessageContext synCtx, boolean
traceOrDebugOn,
- boolean traceOn, Cache cache) {
+ private boolean processRequestMessage(MessageContext synCtx,
ConfigurationContext cfgCtx, boolean traceOrDebugOn,
+ boolean traceOn, Cache cache, FixedByteArrayOutputStream fbaos) {
if (collector) {
handleException("Request messages cannot be handled in a collector
cache", synCtx);
@@ -296,6 +321,8 @@
traceOrDebug(traceOn,
"Existing cached response has expired. Reset cache
element");
}
+
+ cfgCtx.setProperty(cacheObjKey, cache);
}
} else {
@@ -308,10 +335,10 @@
traceOrDebug(traceOn, "In-memory cache is full. Unable
to cache");
}
} else {
- storeRequestToCache(synCtx, requestHash, cache);
+ storeRequestToCache(synCtx, cfgCtx, requestHash, cache,
fbaos);
}
} else {
- storeRequestToCache(synCtx, requestHash, cache);
+ storeRequestToCache(synCtx, cfgCtx, requestHash, cache, fbaos);
}
}
return true;
@@ -321,21 +348,33 @@
* Store request message to the cache
*
* @param synCtx the request message
+ * @param cfgCtx the Abstract context in which the cache will be kept
* @param requestHash the request hash that has already been computed
* @param cache the cache
+ * @param fbaos the serialized request envelope
*/
- private void storeRequestToCache(MessageContext synCtx, String
requestHash, Cache cache) {
+ private void storeRequestToCache(MessageContext synCtx,
ConfigurationContext cfgCtx, String requestHash, Cache cache,
+ FixedByteArrayOutputStream fbaos) {
+
CachedObject cachedObj = new CachedObject();
- ByteArrayOutputStream requestStream = new ByteArrayOutputStream();
- try {
-
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(requestStream);
- cachedObj.setRequestEnvelope(requestStream.toByteArray());
- } catch (XMLStreamException e) {
- handleException("Unable to store the request in to the cache", e,
synCtx);
+ if (fbaos != null) {
+ cachedObj.setRequestEnvelope(fbaos.toByteArray());
+ } else {
+ // this else block can be commented out for the perf improvements,
because we are not using
+ // this for the moment
+ ByteArrayOutputStream requestStream = new ByteArrayOutputStream();
+ try {
+
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(requestStream);
+ cachedObj.setRequestEnvelope(requestStream.toByteArray());
+ } catch (XMLStreamException e) {
+ handleException("Unable to store the request in to the cache",
e, synCtx);
+ }
}
cachedObj.setRequestHash(requestHash);
cachedObj.setTimeout(timeout);
cache.addResponseWithKey(requestHash, cachedObj);
+
+ cfgCtx.setProperty(cacheObjKey, cache);
}
public String getId() {
@@ -344,7 +383,6 @@
public void setId(String id) {
this.id = id;
- this.cacheObjKey = CACHE_OBJ_PREFIX + id;
}
public String getScope() {
@@ -353,6 +391,9 @@
public void setScope(String scope) {
this.scope = scope;
+ if (CachingConstants.SCOPE_PER_MEDIATOR.equals(scope)) {
+ cacheObjKey = CACHE_OBJ_PREFIX + id;
+ }
}
public boolean isCollector() {
@@ -409,5 +450,13 @@
public void setOnCacheHitRef(String onCacheHitRef) {
this.onCacheHitRef = onCacheHitRef;
+ }
+
+ public int getMaxMessageSize() {
+ return maxMessageSize;
+ }
+
+ public void setMaxMessageSize(int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]