Author: ruwan
Date: Tue Nov 13 04:53:35 2007
New Revision: 594519
URL: http://svn.apache.org/viewvc?rev=594519&view=rev
Log:
Fixed caching to work in the clustering environment
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
Modified:
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java?rev=594519&r1=594518&r2=594519&view=diff
==============================================================================
---
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
(original)
+++
webservices/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/builtin/CacheMediator.java
Tue Nov 13 04:53:35 2007
@@ -20,8 +20,10 @@
package org.apache.synapse.mediators.builtin;
import org.apache.axis2.AxisFault;
-import org.apache.axis2.engine.AxisConfiguration;
-import org.apache.axis2.description.Parameter;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.context.Replicator;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.saaj.util.SAAJUtil;
import org.apache.synapse.MessageContext;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.core.axis2.Axis2Sender;
@@ -33,6 +35,15 @@
import org.wso2.caching.CachingConstants;
import org.wso2.caching.digest.DigestGenerator;
+import javax.xml.soap.MessageFactory;
+import javax.xml.soap.MimeHeaders;
+import javax.xml.soap.SOAPException;
+import javax.xml.soap.SOAPMessage;
+import javax.xml.stream.XMLStreamException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
/**
*
*/
@@ -65,41 +76,38 @@
}
}
- AxisConfiguration axisCfg =
synCtx.getConfiguration().getAxisConfiguration();
- if (axisCfg == null) {
- handleException("Unable to perform caching, "
- + " AxisConfiguration cannot be found", synCtx);
+ ConfigurationContext cfgCtx =
+ ((Axis2MessageContext)
synCtx).getAxis2MessageContext().getConfigurationContext();
+ if (cfgCtx == null) {
+ handleException("Unable to perform caching, " + "
ConfigurationContext cannot be found",
+ synCtx);
return false; // never executes.. but keeps IDE happy
}
if (traceOrDebugOn) {
- traceOrDebug(traceOn, "Looking up cache at scope : " +
- scope + " with ID : " + cacheObjKey);
+ traceOrDebug(traceOn,
+ "Looking up cache at scope : " + scope + " with ID : " +
cacheObjKey);
}
// look up cache
- Parameter param = axisCfg.getParameter(cacheObjKey);
- Cache cache = null;
- if (param != null && param.getValue() instanceof Cache) {
- cache = (Cache) param.getValue();
+ Object prop = cfgCtx.getProperty(cacheObjKey);
+ Cache cache;
+ if (prop != null && prop instanceof Cache) {
+ cache = (Cache) prop;
} else {
- synchronized (axisCfg) {
+ synchronized (cfgCtx) {
// check again after taking the lock to make sure no one else
did it before us
- param = axisCfg.getParameter(cacheObjKey);
- if (param != null && param.getValue() instanceof Cache) {
- cache = (Cache) param.getValue();
+ prop = cfgCtx.getProperty(cacheObjKey);
+ if (prop != null && prop instanceof Cache) {
+ cache = (Cache) prop;
} else {
if (traceOrDebugOn) {
traceOrDebug(traceOn, "Creating/recreating the cache
object");
}
cache = new Cache();
- try {
- axisCfg.addParameter(cacheObjKey, cache);
- } catch (AxisFault af) {
- auditWarn("Unable to create a cache with ID : " +
cacheObjKey, synCtx);
- }
+ cfgCtx.setProperty(cacheObjKey, cache);
}
}
}
@@ -112,6 +120,14 @@
result = processRequestMessage(synCtx, traceOrDebugOn, traceOn,
cache);
}
+ try {
+ Replicator.replicate(cfgCtx);
+ } catch (ClusteringFault clusteringFault) {
+ if (traceOrDebugOn) {
+ traceOrDebug(traceOn, "Unable to replicate Cache mediator
state among the cluster");
+ }
+ }
+
if (traceOrDebugOn) {
traceOrDebug(traceOn, "End : Cache mediator");
}
@@ -121,20 +137,21 @@
/**
* Process a response message through this cache mediator. This finds the
Cache used, and
* updates it for the corresponding request hash
+ *
* @param traceOrDebugOn is trace or debug logging on?
- * @param traceOn is tracing on?
- * @param synCtx the current message (response)
- * @param cache the cache
+ * @param traceOn is tracing on?
+ * @param synCtx the current message (response)
+ * @param cache the cache
*/
private void processResponseMessage(boolean traceOrDebugOn, boolean
traceOn,
MessageContext synCtx, Cache cache) {
- Object requestHash =
synCtx.getProperty(CachingConstants.REQUEST_HASH_KEY);
+ String requestHash = (String)
synCtx.getProperty(CachingConstants.REQUEST_HASH_KEY);
if (requestHash != null) {
if (traceOrDebugOn) {
- traceOrDebug(traceOn, "Storing the response message into the
cache at scope : "
- + scope + " with ID : " + cacheObjKey + " for request hash
: " + requestHash);
+ traceOrDebug(traceOn, "Storing the response message into the
cache at scope : " +
+ scope + " with ID : " + cacheObjKey + " for request hash :
" + requestHash);
}
Object obj = cache.getResponseForKey(requestHash);
@@ -143,13 +160,18 @@
CachedObject cachedObj = (CachedObject) obj;
if (traceOrDebugOn) {
- traceOrDebug(traceOn, "Storing the response for the
message with ID : "
- + synCtx.getMessageID() + " with request hash ID : " +
+ traceOrDebug(traceOn, "Storing the response for the
message with ID : " +
+ synCtx.getMessageID() + " with request hash ID : " +
cachedObj.getRequestHash() + " in the cache : " +
cacheObjKey);
}
- cachedObj.setResponseEnvelope(
- MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()));
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ try {
+
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(outStream);
+ cachedObj.setResponseEnvelope(outStream.toByteArray());
+ } catch (XMLStreamException e) {
+ handleException("Unable to set the response to the Cache",
e, synCtx);
+ }
// this is not required yet can commented this for perf
improvements
// in the future there can be a situation where user sends the
request with the
@@ -158,8 +180,7 @@
cachedObj.setResponseHash(digestGenerator.getDigest(
((Axis2MessageContext) synCtx).getAxis2MessageContext()));
- cachedObj.setExpireTime(
- System.currentTimeMillis() + cachedObj.getTimeout());
+ cachedObj.setExpireTime(System.currentTimeMillis() +
cachedObj.getTimeout());
} else {
auditWarn("A response message without a valid mapping to the "
+
@@ -173,18 +194,20 @@
}
/**
- * Processes a request message through the cache mediator. Generates the
request hash and
- * looks up for a hit, if found; then the specified named or anonymous
sequence is executed
- * or marks this message as a response and sends back directly to client.
- * @param synCtx incoming request message
+ * Processes a request message through the cache mediator. Generates the
request hash and looks
+ * up for a hit, if found; then the specified named or anonymous sequence
is executed or marks
+ * this message as a response and sends back directly to client.
+ *
+ * @param synCtx incoming request message
* @param traceOrDebugOn is tracing or debug logging on?
- * @param traceOn is tracing on?
- * @param cache the cache
+ * @param traceOn is tracing on?
+ * @param cache the cache
* @return should this mediator terminate further processing?
*/
- private boolean processRequestMessage(MessageContext synCtx, boolean
traceOrDebugOn, boolean traceOn, Cache cache) {
+ private boolean processRequestMessage(MessageContext synCtx, boolean
traceOrDebugOn,
+ boolean traceOn, Cache cache) {
- Object requestHash = digestGenerator
+ String requestHash = digestGenerator
.getDigest(((Axis2MessageContext)
synCtx).getAxis2MessageContext());
synCtx.setProperty(CachingConstants.REQUEST_HASH_KEY, requestHash);
@@ -202,25 +225,37 @@
if (!cachedObj.isExpired() && cachedObj.getResponseEnvelope() !=
null) {
if (traceOrDebugOn) {
- traceOrDebug(traceOn,
- "Cache-hit for message ID : " + synCtx.getMessageID());
+ traceOrDebug(traceOn, "Cache-hit for message ID : " +
synCtx.getMessageID());
}
// mark as a response and replace envelope from cache
synCtx.setResponse(true);
try {
- synCtx.setEnvelope(cachedObj.getResponseEnvelope());
+ MessageFactory mf = MessageFactory.newInstance();
+ SOAPMessage smsg = mf.createMessage(new MimeHeaders(),
+ new
ByteArrayInputStream(cachedObj.getResponseEnvelope()));
+
+ org.apache.axiom.soap.SOAPEnvelope omSOAPEnv =
+
SAAJUtil.toOMSOAPEnvelope(smsg.getSOAPPart().getDocumentElement());
+
+ synCtx.setEnvelope(omSOAPEnv);
} catch (AxisFault axisFault) {
- handleException(
- "Error setting response envelope from cache : " +
cacheObjKey, synCtx);
+ handleException("Error setting response envelope from
cache : " + cacheObjKey,
+ synCtx);
+ } catch (IOException ioe) {
+ handleException("Error setting response envelope from
cache : " + cacheObjKey,
+ ioe, synCtx);
+ } catch (SOAPException soape) {
+ handleException("Error setting response envelope from
cache : " + cacheObjKey,
+ soape, synCtx);
}
// take specified action on cache hit
if (onCacheHitSequence != null) {
// if there is an onCacheHit use that for the mediation
if (traceOrDebugOn) {
- traceOrDebug(traceOn, "Delegating message to the
onCachingHit " +
- "Anonymous sequence");
+ traceOrDebug(traceOn,
+ "Delegating message to the onCachingHit " +
"Anonymous sequence");
}
onCacheHitSequence.mediate(synCtx);
@@ -249,7 +284,8 @@
// cache exists, but has expired...
cachedObj.clearCache();
if (traceOrDebugOn) {
- traceOrDebug(traceOn, "Existing cached response has
expired. Reset cache element");
+ traceOrDebug(traceOn,
+ "Existing cached response has expired. Reset cache
element");
}
}
@@ -274,13 +310,20 @@
/**
* Store request message to the cache
- * @param synCtx the request message
+ *
+ * @param synCtx the request message
* @param requestHash the request hash that has already been computed
- * @param cache the cache
+ * @param cache the cache
*/
- private void storeRequestToCache(MessageContext synCtx, Object
requestHash, Cache cache) {
+ private void storeRequestToCache(MessageContext synCtx, String
requestHash, Cache cache) {
CachedObject cachedObj = new CachedObject();
-
cachedObj.setRequestEnvelope(MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()));
+ ByteArrayOutputStream requestStream = new ByteArrayOutputStream();
+ try {
+
MessageHelper.cloneSOAPEnvelope(synCtx.getEnvelope()).serialize(requestStream);
+ cachedObj.setRequestEnvelope(requestStream.toByteArray());
+ } catch (XMLStreamException e) {
+ handleException("Unable to store the request in to the cache", e,
synCtx);
+ }
cachedObj.setRequestHash(requestHash);
cachedObj.setTimeout(timeout);
cache.addResponseWithKey(requestHash, cachedObj);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]