Author: ruwan
Date: Mon Dec 3 22:21:41 2007
New Revision: 600800
URL: http://svn.apache.org/viewvc?rev=600800&view=rev
Log:
Improved the caching code and the mediator
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java?rev=600800&r1=600799&r2=600800&view=diff
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
(original)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
Mon Dec 3 22:21:41 2007
@@ -32,9 +32,10 @@
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.util.FixedByteArrayOutputStream;
import org.apache.synapse.util.MessageHelper;
-import org.wso2.caching.Cache;
+import org.wso2.caching.CacheManager;
import org.wso2.caching.CachedObject;
import org.wso2.caching.CachingConstants;
+import org.wso2.caching.CachingException;
import org.wso2.caching.digest.DigestGenerator;
import javax.xml.soap.MessageFactory;
@@ -47,12 +48,17 @@
import java.io.IOException;
/**
+ * Cacche Mediator will cache the response messages indexed using the hash
value of the
+ * request message, and subsequent messages with the same request (request
hash will be
+ * generated and checked for the equality) within the cache expiration period
will be served
+ * from the stored resposnses in the cache
*
+ * @see org.apache.synapse.Mediator
*/
public class CacheMediator extends AbstractMediator {
private String id = null;
- private String scope = CachingConstants.SCOPE_PER_HOST;
+ private String scope = CachingConstants.SCOPE_PER_HOST;// global
private boolean collector = false;
private DigestGenerator digestGenerator =
CachingConstants.DEFAULT_XML_IDENTIFIER;
private int inMemoryCacheSize = CachingConstants.DEFAULT_CACHE_SIZE;
@@ -63,8 +69,8 @@
private SequenceMediator onCacheHitSequence = null;
private String onCacheHitRef = null;
private int maxMessageSize = 0;
- private String cacheObjKey = CachingConstants.CACHE_OBJECT; // default
per-host
- private static final String CACHE_OBJ_PREFIX = "synapse.cache_obj_";
+ private String cacheManagerKey = CachingConstants.CACHE_MANAGER; //
default per-host
+ private static final String CACHE_MANAGER_PREFIX =
"synapse.cache_manager_";
public boolean mediate(MessageContext synCtx) {
@@ -107,42 +113,42 @@
if (traceOrDebugOn) {
traceOrDebug(traceOn,
- "Looking up cache at scope : " + scope + " with ID : " +
cacheObjKey);
+ "Looking up cache at scope : " + scope + " with ID : " +
cacheManagerKey);
}
// look up cache
- Object prop = cfgCtx.getPropertyNonReplicable(cacheObjKey);
- Cache cache;
- if (prop != null && prop instanceof Cache) {
- cache = (Cache) prop;
-
+ Object prop = cfgCtx.getPropertyNonReplicable(cacheManagerKey);
+ CacheManager cacheManager;
+ if (prop != null && prop instanceof CacheManager) {
+ cacheManager = (CacheManager) prop;
} else {
synchronized (cfgCtx) {
// check again after taking the lock to make sure no one else
did it before us
- prop = cfgCtx.getPropertyNonReplicable(cacheObjKey);
- if (prop != null && prop instanceof Cache) {
- cache = (Cache) prop;
+ prop = cfgCtx.getPropertyNonReplicable(cacheManagerKey);
+ if (prop != null && prop instanceof CacheManager) {
+ cacheManager = (CacheManager) prop;
} else {
if (traceOrDebugOn) {
traceOrDebug(traceOn, "Creating/recreating the cache
object");
}
- cache = new Cache();
- cfgCtx.setProperty(cacheObjKey, cache);
+ cacheManager = new CacheManager();
+ cfgCtx.setProperty(cacheManagerKey, cacheManager);
}
}
}
boolean result = true;
- if (synCtx.isResponse()) {
- processResponseMessage(synCtx, cfgCtx, traceOrDebugOn, traceOn,
cache);
-
- } else {
- result = processRequestMessage(synCtx, cfgCtx, traceOrDebugOn,
traceOn, cache, fbaos);
- }
-
try {
- Replicator.replicate(cfgCtx);
+
+ if (synCtx.isResponse()) {
+ processResponseMessage(synCtx, cfgCtx, traceOrDebugOn,
traceOn, cacheManager);
+
+ } else {
+ result = processRequestMessage(
+ synCtx, cfgCtx, traceOrDebugOn, traceOn, cacheManager);
+ }
+
} catch (ClusteringFault clusteringFault) {
if (traceOrDebugOn) {
traceOrDebug(traceOn, "Unable to replicate Cache mediator
state among the cluster");
@@ -152,6 +158,7 @@
if (traceOrDebugOn) {
traceOrDebug(traceOn, "End : Cache mediator");
}
+
return result;
}
@@ -163,10 +170,11 @@
* @param traceOn is tracing on?
* @param synCtx the current message (response)
* @param cfgCtx the abstract context in which the cache will be
kept
- * @param cache the cache
+ * @param cacheManager the cache manager
+ * @throws ClusteringFault is there is an error in replicating the cfgCtx
*/
private void processResponseMessage(MessageContext synCtx,
ConfigurationContext cfgCtx,
- boolean traceOrDebugOn, boolean traceOn, Cache cache) {
+ boolean traceOrDebugOn, boolean traceOn, CacheManager cacheManager)
throws ClusteringFault {
if (!collector) {
handleException("Response messages cannot be handled in a non
collector cache", synCtx);
@@ -177,18 +185,16 @@
if (requestHash != null) {
if (traceOrDebugOn) {
traceOrDebug(traceOn, "Storing the response message into the
cache at scope : " +
- scope + " with ID : " + cacheObjKey + " for request hash :
" + requestHash);
+ scope + " with ID : " + cacheManagerKey + " for request
hash : " + requestHash);
}
- Object obj = cache.getResponseForKey(requestHash, cfgCtx);
+ CachedObject cachedObj =
cacheManager.getResponseForKey(requestHash, cfgCtx);
+ if (cachedObj != null) {
- if (obj != null && obj instanceof CachedObject) {
-
- CachedObject cachedObj = (CachedObject) obj;
if (traceOrDebugOn) {
traceOrDebug(traceOn, "Storing the response for the
message with ID : " +
synCtx.getMessageID() + " with request hash ID : " +
- cachedObj.getRequestHash() + " in the cache : " +
cacheObjKey);
+ cachedObj.getRequestHash() + " in the cache : " +
cacheManagerKey);
}
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
@@ -207,9 +213,10 @@
// cachedObj.setResponseHash(cache.getGenerator().getDigest(
// ((Axis2MessageContext)
synCtx).getAxis2MessageContext()));
- cachedObj.setExpireTime(System.currentTimeMillis() +
cachedObj.getTimeout());
+ cachedObj.setExpireTimeMillis(System.currentTimeMillis() +
cachedObj.getTimeout());
- cfgCtx.setProperty(cacheObjKey, cache);
+ cfgCtx.setProperty(cacheManagerKey, cacheManager);
+ Replicator.replicate(cfgCtx);
} else {
auditWarn("A response message without a valid mapping to the "
+
@@ -231,31 +238,35 @@
* @param cfgCtx the AbstractContext in which the cache will be
kept
* @param traceOrDebugOn is tracing or debug logging on?
* @param traceOn is tracing on?
- * @param cache the cache
- * @param fbaos the serialized request envelope
+ * @param cacheManager the cache manager
* @return should this mediator terminate further processing?
+ * @throws ClusteringFault if there is an error in replicating the cfgCtx
*/
private boolean processRequestMessage(MessageContext synCtx,
ConfigurationContext cfgCtx,
- boolean traceOrDebugOn, boolean traceOn, Cache cache,
FixedByteArrayOutputStream fbaos) {
+ boolean traceOrDebugOn, boolean traceOn, CacheManager cacheManager)
throws ClusteringFault {
if (collector) {
handleException("Request messages cannot be handled in a collector
cache", synCtx);
}
- String requestHash = digestGenerator
- .getDigest(((Axis2MessageContext)
synCtx).getAxis2MessageContext());
- synCtx.setProperty(CachingConstants.REQUEST_HASH_KEY, requestHash);
+ String requestHash = null;
+ try {
+ requestHash = digestGenerator.getDigest(((Axis2MessageContext)
synCtx).getAxis2MessageContext());
+ synCtx.setProperty(CachingConstants.REQUEST_HASH_KEY, requestHash);
+ } catch (CachingException e) {
+ handleException("Error in calculating the hash value of the
request", e, synCtx);
+ }
if (traceOrDebugOn) {
traceOrDebug(traceOn, "Generated request hash : " + requestHash);
}
- if (cache.containsKey(requestHash) &&
- cache.getResponseForKey(requestHash, cfgCtx) instanceof
CachedObject) {
+ if (cacheManager.containsKey(requestHash) &&
+ cacheManager.getResponseForKey(requestHash, cfgCtx) != null) {
// get the response from the cache and attach to the context and
change the
// direction of the message
- CachedObject cachedObj = (CachedObject)
cache.getResponseForKey(requestHash, cfgCtx);
+ CachedObject cachedObj =
cacheManager.getResponseForKey(requestHash, cfgCtx);
if (!cachedObj.isExpired() && cachedObj.getResponseEnvelope() !=
null) {
@@ -273,15 +284,17 @@
org.apache.axiom.soap.SOAPEnvelope omSOAPEnv =
SAAJUtil.toOMSOAPEnvelope(smsg.getSOAPPart().getDocumentElement());
+ // todo: if there is a WSA messageID in the response, is
that need to be unique on each and every resp
+
synCtx.setEnvelope(omSOAPEnv);
} catch (AxisFault axisFault) {
- handleException("Error setting response envelope from
cache : " + cacheObjKey,
+ handleException("Error setting response envelope from
cache : " + cacheManagerKey,
synCtx);
} catch (IOException ioe) {
- handleException("Error setting response envelope from
cache : " + cacheObjKey,
+ handleException("Error setting response envelope from
cache : " + cacheManagerKey,
ioe, synCtx);
} catch (SOAPException soape) {
- handleException("Error setting response envelope from
cache : " + cacheObjKey,
+ handleException("Error setting response envelope from
cache : " + cacheManagerKey,
soape, synCtx);
}
@@ -306,7 +319,7 @@
if (traceOrDebugOn) {
traceOrDebug(traceOn, "Request message " +
synCtx.getMessageID() +
- " has served from the cache : " + cacheObjKey);
+ " was served from the cache : " + cacheManagerKey);
}
// send the response back if there is not onCacheHit is
specified
synCtx.setTo(null);
@@ -317,29 +330,31 @@
} else {
// cache exists, but has expired...
- cachedObj.clearCache();
+ cachedObj.expire();
if (traceOrDebugOn) {
traceOrDebug(traceOn,
"Existing cached response has expired. Reset cache
element");
}
- cfgCtx.setProperty(cacheObjKey, cache);
+ cfgCtx.setProperty(cacheManagerKey, cacheManager);
+ Replicator.replicate(cfgCtx);
}
} else {
+ // todo: find a proper way of achieving the cache size check
// if not found in cache, check if we can cache this request
- if (cache.getCacheKeys().size() == inMemoryCacheSize) {
- cache.removeExpiredResponses(cfgCtx);
- if (cache.getCacheKeys().size() == inMemoryCacheSize) {
+ if (cacheManager.getCacheKeys().size() == inMemoryCacheSize) {
+ cacheManager.removeExpiredResponses(cfgCtx);
+ if (cacheManager.getCacheKeys().size() == inMemoryCacheSize) {
if (traceOrDebugOn) {
traceOrDebug(traceOn, "In-memory cache is full. Unable
to cache");
}
} else {
- storeRequestToCache(synCtx, cfgCtx, requestHash, cache,
fbaos);
+ storeRequestToCache(cfgCtx, requestHash, cacheManager);
}
} else {
- storeRequestToCache(synCtx, cfgCtx, requestHash, cache, fbaos);
+ storeRequestToCache(cfgCtx, requestHash, cacheManager);
}
}
return true;
@@ -348,34 +363,23 @@
/**
* Store request message to the cache
*
- * @param synCtx the request message
- * @param cfgCtx the Abstract context in which the cache will be kept
- * @param requestHash the request hash that has already been computed
- * @param cache the cache
- * @param fbaos the serialized request envelope
+ * @param cfgCtx - the Abstract context in which the cache will be
kept
+ * @param requestHash - the request hash that has already been computed
+ * @param cacheManager - the cache
+ * @throws ClusteringFault if there is an error in replicating the cfgCtx
*/
- private void storeRequestToCache(MessageContext synCtx,
ConfigurationContext cfgCtx,
- String requestHash, Cache cache, FixedByteArrayOutputStream fbaos) {
+ private void storeRequestToCache(ConfigurationContext cfgCtx,
+ String requestHash, CacheManager cacheManager) throws ClusteringFault {
CachedObject cachedObj = new CachedObject();
- if (fbaos != null) {
- cachedObj.setRequestEnvelope(fbaos.toByteArray());
- } else {
- // this else block can be commented out for the perf improvements,
- // because we are not using this for the moment
- ByteArrayOutputStream requestStream = new ByteArrayOutputStream();
- try {
-
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(requestStream);
- cachedObj.setRequestEnvelope(requestStream.toByteArray());
- } catch (XMLStreamException e) {
- handleException("Unable to store the request in to the cache",
e, synCtx);
- }
- }
cachedObj.setRequestHash(requestHash);
+ // this does not set the expiretime but just sets the timeout and the
espiretime will
+ // be set when the response is availabel
cachedObj.setTimeout(timeout);
- cache.addResponseWithKey(requestHash, cachedObj, cfgCtx);
+ cacheManager.addResponseWithKey(requestHash, cachedObj, cfgCtx);
- cfgCtx.setProperty(cacheObjKey, cache);
+ cfgCtx.setProperty(cacheManagerKey, cacheManager);
+ Replicator.replicate(cfgCtx);
}
public String getId() {
@@ -393,7 +397,7 @@
public void setScope(String scope) {
this.scope = scope;
if (CachingConstants.SCOPE_PER_MEDIATOR.equals(scope)) {
- cacheObjKey = CACHE_OBJ_PREFIX + id;
+ cacheManagerKey = CACHE_MANAGER_PREFIX + id;
}
}
@@ -429,6 +433,7 @@
this.diskCacheSize = diskCacheSize;
}
+ // change the variable to Timeout milis seconds
public long getTimeout() {
return timeout / 1000;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]