Author: slaws
Date: Sun Feb 10 13:31:04 2008
New Revision: 620333
URL: http://svn.apache.org/viewvc?rev=620333&view=rev
Log:
TUSCANY-1999
Fix to move the conversation expiry processing into ConversationManager. Thanks
to Ben Smith and Thomas Greenwood for the patch
Modified:
incubator/tuscany/java/sca/itest/conversations/src/main/java/org/apache/tuscany/sca/itest/conversational/impl/ConversationalServiceStatelessImpl.java
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManagerImpl.java
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ExtendedConversation.java
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ExtendedConversationImpl.java
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/JDKInvocationHandler.java
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeWireInvoker.java
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java
incubator/tuscany/java/sca/modules/implementation-java/src/main/java/org/apache/tuscany/sca/implementation/java/introspect/impl/ConversationProcessor.java
incubator/tuscany/java/sca/modules/implementation-java/src/test/java/org/apache/tuscany/sca/implementation/java/introspect/impl/ConversationProcessorTestCase.java
Modified:
incubator/tuscany/java/sca/itest/conversations/src/main/java/org/apache/tuscany/sca/itest/conversational/impl/ConversationalServiceStatelessImpl.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/itest/conversations/src/main/java/org/apache/tuscany/sca/itest/conversational/impl/ConversationalServiceStatelessImpl.java?rev=620333&r1=620332&r2=620333&view=diff
==============================================================================
---
incubator/tuscany/java/sca/itest/conversations/src/main/java/org/apache/tuscany/sca/itest/conversational/impl/ConversationalServiceStatelessImpl.java
(original)
+++
incubator/tuscany/java/sca/itest/conversations/src/main/java/org/apache/tuscany/sca/itest/conversational/impl/ConversationalServiceStatelessImpl.java
Sun Feb 10 13:31:04 2008
@@ -36,6 +36,7 @@
* @version $Rev: 537240 $ $Date: 2007-05-11 18:35:03 +0100 (Fri, 11 May 2007)
$
*/
@Service(ConversationalService.class)
[EMAIL PROTECTED]("STATELESS")
public class ConversationalServiceStatelessImpl implements
ConversationalService {
@ConversationID
Modified:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java?rev=620333&r1=620332&r2=620333&view=diff
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java
(original)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManager.java
Sun Feb 10 13:31:04 2008
@@ -19,6 +19,9 @@
package org.apache.tuscany.sca.core.conversation;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.apache.tuscany.sca.runtime.RuntimeWire;
+
/**
* The manager of conversations
*
@@ -58,4 +61,14 @@
* @param listener
*/
void removeListener(ConversationListener listener);
+
+ /**
+ * @return the default max age for a conversation
+ */
+ long getMaxAge();
+
+ /**
+ * @return the default max idle time for a conversation
+ */
+ long getMaxIdleTime();
}
Modified:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManagerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManagerImpl.java?rev=620333&r1=620332&r2=620333&view=diff
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManagerImpl.java
(original)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ConversationManagerImpl.java
Sun Feb 10 13:31:04 2008
@@ -25,16 +25,81 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.tuscany.sca.core.scope.ScopedImplementationProvider;
+import org.apache.tuscany.sca.provider.ImplementationProvider;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+import org.apache.tuscany.sca.runtime.RuntimeWire;
/**
* @version $Rev$ $Date$
*/
public class ConversationManagerImpl implements ConversationManager {
+
private List<ConversationListener> listeners =
Collections.synchronizedList(new ArrayList<ConversationListener>());
- private Map<Object, ExtendedConversation> converations = new
ConcurrentHashMap<Object, ExtendedConversation>();
+ private Map<Object, ExtendedConversation> conversations = new
ConcurrentHashMap<Object, ExtendedConversation>();
/**
+ * the default max age. this is set to 1 hour
+ */
+ private static final long DEFAULT_MAX_AGE = 60 * 60 * 1000; ;
+
+ /**
+ * the default max idle time. this is set to 1 hour
+ */
+ private static final long DEFAULT_MAX_IDLE_TIME = 60 * 60 * 1000;
+
+ /**
+ * the globally used max age
+ */
+ private final long maxAge;
+
+ /**
+ * the globally used max idle time
+ */
+ private final long maxIdleTime;
+
+ /**
+ * the reaper thread
+ */
+ private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
+
+ /**
+ * constructor
+ */
+ public ConversationManagerImpl()
+ {
+ long mit = DEFAULT_MAX_IDLE_TIME;
+ long ma = DEFAULT_MAX_AGE;
+
+ String aProperty;
+ aProperty =
System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.MaxIdleTime");
+ if (aProperty != null) {
+ try {
+ mit = (new Long(aProperty) * 1000);
+ } catch (NumberFormatException nfe) {
+ // Ignore
+ }
+ }
+
+ aProperty =
System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.MaxAge");
+ if (aProperty != null) {
+ try {
+ ma = (new Long(aProperty) * 1000);
+ } catch (NumberFormatException nfe) {
+ // Ignore
+ }
+ }
+
+ maxAge = ma;
+ maxIdleTime = mit;
+ }
+
+ /**
* @see
org.apache.tuscany.sca.core.conversation.ConversationManager#addListener(org.apache.tuscany.sca.core.conversation.ConversationListener)
*/
public void addListener(ConversationListener listener) {
@@ -52,7 +117,7 @@
listener.conversationEnded(conv);
}
conv.setConversationID(null);
- converations.remove(conversationID);
+ conversations.remove(conversationID);
} else {
throw new IllegalStateException("Conversation " + conversationID +
" doesn't exist.");
}
@@ -61,10 +126,10 @@
public void expireConversation(Object conversationID) {
ExtendedConversation conv = getConversation(conversationID);
if (conv != null) {
-
((ExtendedConversationImpl)conv).setState(ConversationState.EXPIRED);
for (ConversationListener listener : listeners) {
listener.conversationExpired(conv);
}
+ conversations.remove(conversationID);
} else {
throw new IllegalStateException("Conversation " + conversationID +
" doesn't exist.");
}
@@ -75,7 +140,7 @@
* @see
org.apache.tuscany.sca.core.conversation.ConversationManager#getConversation(java.lang.Object)
*/
public ExtendedConversation getConversation(Object conversationID) {
- return converations.get(conversationID);
+ return conversations.get(conversationID);
}
/**
@@ -86,9 +151,29 @@
}
/**
+ * starts the reaper thread
+ */
+ public void scheduleConversation(ExtendedConversationImpl aConversation,
long time)
+ {
+ this.scheduler.schedule(aConversation, time, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * stops the reaper thread
+ */
+ public synchronized void stopReaper() {
+
+ // Prevent the scheduler from submitting any additional reapers,
+ // initiate an orderly shutdown if a reaper task is in progress.
+ this.scheduler.shutdown();
+ }
+
+
+ /**
* @see
org.apache.tuscany.sca.core.conversation.ConversationManager#startConversation(java.lang.Object)
*/
public ExtendedConversation startConversation(Object conversationID) {
+
if (conversationID == null) {
conversationID = UUID.randomUUID().toString();
}
@@ -96,12 +181,30 @@
if (conversation != null && conversation.getState() !=
ConversationState.ENDED) {
throw new IllegalStateException(conversation + " already exists.");
}
- conversation = new ExtendedConversationImpl(this, conversationID,
ConversationState.STARTED);
- converations.put(conversationID, conversation);
+
+ conversation = new ExtendedConversationImpl(
+ this, conversationID, ConversationState.STARTED);
+ conversations.put(conversationID, conversation);
for (ConversationListener listener : listeners) {
listener.conversationStarted(conversation);
}
return conversation;
}
+ /**
+ * return the default max idle time
+ * @param impProvider the implementation Provider to extract any
ConversationAttribute details
+ */
+ public long getMaxIdleTime()
+ {
+ return maxIdleTime;
+ }
+
+ /**
+ * returns the default max age
+ * @param impProvider the implementation Provider to extract any
ConversationAttribute details
+ */
+ public long getMaxAge(){
+ return maxAge;
+ }
}
Modified:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ExtendedConversation.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ExtendedConversation.java?rev=620333&r1=620332&r2=620333&view=diff
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ExtendedConversation.java
(original)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ExtendedConversation.java
Sun Feb 10 13:31:04 2008
@@ -19,6 +19,7 @@
package org.apache.tuscany.sca.core.conversation;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.osoa.sca.Conversation;
/**
@@ -34,17 +35,32 @@
ConversationState getState();
/**
- * Mark the conversation expired
+ * @param state the state to set
*/
- void expire();
+ void setState(ConversationState state);
/**
- * @param state the state to set
+ * @param conversationID the conversationID to set
+ */
+ void setConversationID(Object conversationID);
+
+
+ /**
+ * will check whether this conversation has expired and update state if it
has
+ * @return true if it has expired
+ */
+ boolean isExpired();
+
+ /**
+ * updates the last time this conversation was referenced
*/
- public void setState(ConversationState state);
+ void updateLastReferencedTime();
+
+ public void initializeConversationAttributes(RuntimeComponent
targetComponent);
+
/**
- * @param conversationID the conversationID to set
+ * @return true if the conversational attributes have been initialized
*/
- public void setConversationID(Object conversationID);
+ public boolean conversationalAttributesInitialized();
}
Modified:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ExtendedConversationImpl.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ExtendedConversationImpl.java?rev=620333&r1=620332&r2=620333&view=diff
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ExtendedConversationImpl.java
(original)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/conversation/ExtendedConversationImpl.java
Sun Feb 10 13:31:04 2008
@@ -18,34 +18,136 @@
*/
package org.apache.tuscany.sca.core.conversation;
+import org.apache.tuscany.sca.core.scope.ScopedImplementationProvider;
+import org.apache.tuscany.sca.provider.ImplementationProvider;
+import org.apache.tuscany.sca.runtime.RuntimeComponent;
+
/**
*
* @version $Rev$ $Date$
*/
-public class ExtendedConversationImpl implements ExtendedConversation {
- private ConversationManager manager;
- private Object conversationID;
+public class ExtendedConversationImpl implements ExtendedConversation,
Runnable {
+
+ private final ConversationManagerImpl manager;
+ private volatile Object conversationID;
private ConversationState state;
/**
- * @param manager
- * @param conversationID
- * @param state
+ * syncs access to the state
+ */
+ private final Object stateSync = new Object();
+
+ /**
+ * the maximum time a conversation can exist
+ */
+ private long expirationTime = 0;
+
+ /**
+ * the maximum time this conversation can be idle
+ */
+ private long maxIdleTime = 0;
+
+ /**
+ * the maximum age of this conversation
+ */
+ private long maxAge = 0;
+
+ /**
+ * the time that this object was created
+ */
+ private long creationTime;
+
+ /**
+ * the time that this object was last referenced
*/
- public ExtendedConversationImpl(ConversationManager manager, Object
conversationID, ConversationState state) {
+ private long lastReferencedTime;
+
+ /**
+ * boolean to ensure expiry only occurs once
+ */
+ private boolean expired = false;
+
+ /**
+ * boolean to indicate if the conversation attributes have
+ * been set. In the case where a remote binding is used
+ * within a composite the JDKInvocationHandler can create the
+ * conversation but the conversationAttributes are not available
+ * until the conversation is retrieved by the RuntimeWireInvoker
+ */
+ private boolean conversationAttributesInitialized = false;
+
+ /**
+ * Constructor
+ * @param manager the conversation manager
+ * @param conversationID the conversation id associated with this
conversation
+ * @param state the initial state of this conversation
+ * @param aMaxAge the maximum age of the conversation
+ * @param aMaxIdleTime the maximum idle time
+ */
+ public ExtendedConversationImpl(ConversationManagerImpl manager,
+ Object conversationID, ConversationState state) {
super();
+
+ this.creationTime = System.currentTimeMillis();
+ this.lastReferencedTime = creationTime;
this.manager = manager;
this.conversationID = conversationID;
this.state = state;
}
- public void expire() {
- manager.expireConversation(conversationID);
+ /**
+ * will check whether this conversation has expired and update state if it
has
+ * @return true if it has expired
+ */
+ public boolean isExpired()
+ {
+ long currentTime;
+ synchronized (stateSync){
+
+ // check state first
+ if (state == ConversationState.EXPIRED){
+ return true;
+ }
+
+ // check whether the time is finished
+ currentTime = System.currentTimeMillis();
+ if (((this.lastReferencedTime + this.maxIdleTime) <=
currentTime) ||
+ (this.expirationTime <= currentTime)){
+ setState(ConversationState.EXPIRED);
+ return true;
+ }
+ }
+ scheduleNextExpiryTime(currentTime);
+ return false;
}
+ /**
+ * schedule next expiry time
+ */
+ public void scheduleNextExpiryTime(long currentTime)
+ {
+ if ((lastReferencedTime + maxIdleTime) < expirationTime){
+ manager.scheduleConversation(this, (lastReferencedTime +
maxIdleTime) - currentTime);
+ }
+ else{
+ manager.scheduleConversation(this, expirationTime -
currentTime);
+ }
+ }
+ /**
+ * updates the last time this conversation was referenced
+ */
+ public void updateLastReferencedTime() {
+ this.lastReferencedTime = System.currentTimeMillis();
+ if (conversationAttributesInitialized == true){
+ scheduleNextExpiryTime(lastReferencedTime);
+ }
+ }
+
public ConversationState getState() {
- return state;
+ synchronized (stateSync){
+ return state;
+ }
}
public void end() {
@@ -60,17 +162,103 @@
* @param state the state to set
*/
public void setState(ConversationState state) {
- this.state = state;
+ synchronized (stateSync){
+ this.state = state;
+ }
}
/**
* @param conversationID the conversationID to set
*/
public void setConversationID(Object conversationID) {
- if (state != ConversationState.ENDED) {
- throw new IllegalStateException("The state of conversation " +
conversationID + " " + state);
- }
+ synchronized (stateSync){
+ if (state != ConversationState.ENDED) {
+ throw new IllegalStateException("The state of
conversation " + conversationID + " " + state);
+ }
+ }
this.conversationID = conversationID;
}
+
+ /**
+ * @param maxAge the maximum age of this conversation
+ */
+ public void initializeConversationAttributes(RuntimeComponent
targetComponent){
+ if (targetComponent != null){
+ this.maxAge =
getMaxIdleTime(targetComponent.getImplementationProvider());
+ this.maxIdleTime =
getMaxAge(targetComponent.getImplementationProvider());
+ this.expirationTime = creationTime + maxAge;
+ this.conversationAttributesInitialized = true;
+ }
+ }
+
+ /**
+ * @return true if the conversational attributes have been initialized
+ */
+ public boolean conversationalAttributesInitialized(){
+ return this.conversationAttributesInitialized;
+ }
+
+ /**
+ * return the max idle time
+ * @param impProvider the implementation Provider to extract any
ConversationAttribute details
+ */
+ private long getMaxIdleTime(ImplementationProvider impProvider)
+ {
+ // Check to see if the maxIdleTime has been specified using
@ConversationAttributes.
+ // Implementation annotated attributes are honoured first.
+ if ((impProvider != null) &&
+ (impProvider instanceof ScopedImplementationProvider)) {
+ ScopedImplementationProvider aScopedImpl =
+ (ScopedImplementationProvider) impProvider;
+
+ long maxIdleTime = aScopedImpl.getMaxIdleTime();
+ if (maxIdleTime > 0) {
+ return maxIdleTime;
+ }
+ }
+ return manager.getMaxIdleTime();
+ }
+
+ /**
+ * returns the max age
+ * @param impProvider the implementation Provider to extract any
ConversationAttribute details
+ */
+ private long getMaxAge(ImplementationProvider impProvider){
+ // Check to see if the maxAge has been specified using
@ConversationAttributes.
+ // Implementation annotated attributes are honoured first.
+ if ((impProvider != null) &&
+ (impProvider instanceof ScopedImplementationProvider)) {
+ ScopedImplementationProvider aScopedImpl =
+ (ScopedImplementationProvider) impProvider;
+
+ long maxAge = aScopedImpl.getMaxAge();
+ if (maxAge > 0) {
+ return maxAge;
+ }
+ }
+ return manager.getMaxAge();
+ }
+
+ /**
+ * called when expiring
+ */
+ public void run()
+ {
+ synchronized (stateSync){
+ if (!expired){
+ if (isExpired()) {
+ expired = true;
+ try {
+
manager.expireConversation(getConversationID());
+ } catch (IllegalStateException ise) {
+ // ignore this.. this can occur if
another thread has subsequently ended
+ // the conversation
+ }
+ }
+ }
+ }
+
+ }
+
}
Modified:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/JDKInvocationHandler.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/JDKInvocationHandler.java?rev=620333&r1=620332&r2=620333&view=diff
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/JDKInvocationHandler.java
(original)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/JDKInvocationHandler.java
Sun Feb 10 13:31:04 2008
@@ -49,11 +49,13 @@
import org.apache.tuscany.sca.invocation.Invoker;
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.provider.ImplementationProvider;
import org.apache.tuscany.sca.runtime.EndpointReference;
import org.apache.tuscany.sca.runtime.ReferenceParameters;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeWire;
import org.osoa.sca.CallableReference;
+import org.osoa.sca.ConversationEndedException;
import org.osoa.sca.ServiceReference;
import org.osoa.sca.ServiceRuntimeException;
@@ -332,15 +334,23 @@
// Not conversational or the conversation has been started
return;
}
+
ConversationManager conversationManager =
((RuntimeWireImpl)wire).getConversationManager();
+
if (conversation == null || conversation.getState() ==
ConversationState.ENDED) {
+
conversation =
conversationManager.startConversation(getConversationID());
+
conversation.initializeConversationAttributes(wire.getTarget().getComponent());
if (callableReference != null) {
((CallableReferenceImpl)callableReference).attachConversation(conversation);
}
}
- // TODO - assuming that the conversation ID is a string here when
- // it can be any object that is serializable to XML
+ else if (conversation.isExpired()){
+ throw new ConversationEndedException("Conversation has
expired.");
+ }
+
+ conversation.updateLastReferencedTime();
+
msg.getFrom().getReferenceParameters().setConversationID(conversation.getConversationID());
}
Modified:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeWireInvoker.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeWireInvoker.java?rev=620333&r1=620332&r2=620333&view=diff
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeWireInvoker.java
(original)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/RuntimeWireInvoker.java
Sun Feb 10 13:31:04 2008
@@ -44,6 +44,7 @@
import org.apache.tuscany.sca.runtime.ReferenceParameters;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.runtime.RuntimeWire;
+import org.osoa.sca.ConversationEndedException;
import org.osoa.sca.ServiceReference;
import org.osoa.sca.ServiceRuntimeException;
@@ -207,7 +208,15 @@
if (conversation == null || conversation.getState() ==
ConversationState.ENDED) {
conversation =
conversationManager.startConversation(conversationID);
+
conversation.initializeConversationAttributes(wire.getTarget().getComponent());
+ } else if (conversation.conversationalAttributesInitialized() ==
false) {
+
conversation.initializeConversationAttributes(wire.getTarget().getComponent());
}
+ else if (conversation.isExpired()){
+ throw new ConversationEndedException("Conversation has
expired.");
+ }
+
+ conversation.updateLastReferencedTime();
parameters.setConversationID(conversation.getConversationID());
}
Modified:
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java?rev=620333&r1=620332&r2=620333&view=diff
==============================================================================
---
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java
(original)
+++
incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java
Sun Feb 10 13:31:04 2008
@@ -20,14 +20,10 @@
package org.apache.tuscany.sca.core.scope;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.apache.tuscany.sca.core.context.InstanceWrapper;
import org.apache.tuscany.sca.core.conversation.ConversationListener;
@@ -37,23 +33,16 @@
import org.apache.tuscany.sca.invocation.Message;
import org.apache.tuscany.sca.runtime.RuntimeComponent;
import org.apache.tuscany.sca.store.Store;
-import org.osoa.sca.ConversationEndedException;
/**
* A scope context which manages atomic component instances keyed on
ConversationID
- *
+ *
*/
public class ConversationalScopeContainer extends
AbstractScopeContainer<Object> implements ConversationListener {
private ConversationManager conversationManager;
private Map<Object, InstanceLifeCycleWrapper> instanceLifecycleCollection =
new ConcurrentHashMap<Object, InstanceLifeCycleWrapper>();
- //TODO: This needs to observe the value set by ConversationalAttributes
for now we will hard code it.
- private long max_age = 60 * 60 * 1000; // 1 hour;
- private long max_idle_time = 60 * 60 * 1000; // 1 hour;
- private long reaper_interval = 60; // every minute;
- private ScheduledExecutorService scheduler;
-
public ConversationalScopeContainer(Store aStore, RuntimeComponent
component) {
super(Scope.CONVERSATION, component);
@@ -63,77 +52,9 @@
// Check System properties to see if timeout values have been
specified. All timeout values
// will be specified in seconds.
//
- String aProperty;
- aProperty =
System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.MaxIdleTime");
- if (aProperty != null) {
- try {
- max_idle_time = (new Long(aProperty) * 1000);
- } catch (NumberFormatException nfe) {
- // Ignore
- }
- }
-
- aProperty =
System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.MaxAge");
- if (aProperty != null) {
- try {
- max_age = (new Long(aProperty) * 1000);
- } catch (NumberFormatException nfe) {
- // Ignore
- }
- }
-
- aProperty =
System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.ReaperInterval");
- if (aProperty != null) {
- try {
- reaper_interval = new Long(aProperty);
- } catch (NumberFormatException nfe) {
- // Ignore
- }
- }
-
- // Check to see if the maxAge and/or maxIdleTime have been specified
using @ConversationAttributes.
- // Implementation annoated attributes are honored first.
- if (this.getComponent().getImplementationProvider() instanceof
ScopedImplementationProvider) {
- ScopedImplementationProvider aScopedImpl =
-
(ScopedImplementationProvider)this.getComponent().getImplementationProvider();
-
- long maxAge = aScopedImpl.getMaxAge();
- if (maxAge > 0) {
- max_age = maxAge;
- }
- long maxIdleTime = aScopedImpl.getMaxIdleTime();
- if (maxIdleTime > 0) {
- max_idle_time = maxIdleTime;
- }
- }
}
- @Override
- public synchronized void start() {
- if (lifecycleState != UNINITIALIZED && lifecycleState != STOPPED) {
- throw new IllegalStateException("Scope must be in UNINITIALIZED or
STOPPED state [" + lifecycleState + "]");
- }
-
- // Get a scheduler and scheduled a task to be run in the future
indefinitely until its explicitly shutdown.
- this.scheduler = Executors.newSingleThreadScheduledExecutor();
- scheduler.scheduleAtFixedRate(new
ConversationalInstanceReaper(this.instanceLifecycleCollection),
- 3,
- reaper_interval,
- TimeUnit.SECONDS);
-
- lifecycleState = RUNNING;
- }
-
- @Override
- public synchronized void stop() {
-
- // Prevent the scheduler from submitting any additional reapers,
initiate an orderly shutdown if a reaper task is in progress.
- if (this.scheduler != null)
- this.scheduler.shutdown();
-
- lifecycleState = STOPPED;
- }
protected InstanceWrapper getInstanceWrapper(boolean create, Object
contextId) throws TargetResolutionException {
@@ -151,8 +72,6 @@
}
}
- contextId = contextId;
-
InstanceLifeCycleWrapper anInstanceWrapper =
this.instanceLifecycleCollection.get(contextId);
if (anInstanceWrapper == null && !create)
@@ -162,14 +81,6 @@
anInstanceWrapper = new InstanceLifeCycleWrapper(contextId);
this.instanceLifecycleCollection.put(contextId, anInstanceWrapper);
}
- // If an existing instance is found return it only if its not expired
and update its
- // last referenced time.
- else {
- if (anInstanceWrapper.isExpired()) {
- throw new ConversationEndedException();
- }
- anInstanceWrapper.updateLastReferencedTime();
- }
return anInstanceWrapper.getInstanceWrapper(contextId);
@@ -183,11 +94,11 @@
/**
* This method allows a new context id to be registered alongside an
existing one. This happens in
* one case, when a conversation includes a stateful callback. The client
component instance
- * must be registered against all outgoing conversation ids so that the
component instance
- * can be found when the callback arrives
- *
+ * must be registered against all outgoing conversation ids so that the
component instance
+ * can be found when the callback arrives
+ *
* @param existingContextId the context id against which the component is
already registered
- * @param context this should be a conversation object so that the
conversation can b stored
+ * @param context this should be a conversation object so that the
conversation can b stored
* and reset when the component instance is removed
*/
public void addWrapperReference(Object existingContextId, Object
contextId) throws TargetResolutionException {
@@ -223,8 +134,8 @@
this.instanceLifecycleCollection.put(contextId, anInstanceWrapper);
}
- // The remove is invoked when a conversation is explicitly ended. This
can occur by using the @EndsConversation or API.
- // In this case the instance is immediately removed. A new conversation
will be started on the next operation
+ // The remove is invoked when a conversation is explicitly ended. This can
ccur by using the @EndsConversation or API.
+ // In this case the instance is immediately removed. A new conversation
will be started on the next operation
// associated with this conversationId's service reference.
//
@Override
@@ -239,50 +150,25 @@
}
/*
- * This is an inner class that keeps track of the lifecycle of a
conversation scoped
- * implementation instance.
- *
- */
+ * This is an inner class that keeps track of the lifecycle of a
conversation scoped
+ * implementation instance.
+ *
+ */
private class InstanceLifeCycleWrapper {
private Object clientConversationId;
private List<Object> callbackConversations = new ArrayList<Object>();
- private long creationTime;
- private long lastReferencedTime;
- private long expirationInterval;
- private long maxIdleTime;
private InstanceLifeCycleWrapper(Object contextId) throws
TargetResolutionException {
this.clientConversationId = contextId;
- this.creationTime = System.currentTimeMillis();
- this.lastReferencedTime = this.creationTime;
- this.expirationInterval = max_age;
- this.maxIdleTime = max_idle_time;
this.createInstance(contextId);
}
private InstanceLifeCycleWrapper(InstanceWrapper wrapper, Object
contextId) throws TargetResolutionException {
this.clientConversationId = contextId;
- this.creationTime = System.currentTimeMillis();
- this.lastReferencedTime = this.creationTime;
- this.expirationInterval = max_age;
- this.maxIdleTime = max_idle_time;
wrappers.put(contextId, wrapper);
}
- private boolean isExpired() {
- long currentTime = System.currentTimeMillis();
- if ((this.lastReferencedTime + this.maxIdleTime) < currentTime) //
max idle time exceeded
- return true;
- if ((this.creationTime + this.expirationInterval) < currentTime)
// max time to live exceeded
- return true;
-
- return false;
- }
-
- private void updateLastReferencedTime() {
- this.lastReferencedTime = System.currentTimeMillis();
- }
// Associates a callback conversation with this instance. Each time
the scope container
// is asked to remove an object given a ontextId an associated
conversation object will
@@ -331,53 +217,10 @@
}
- //
- // This inner class is an instance reaper. It periodically iterates over
the InstanceLifeCycleCollection
- // and for any instances that have expired removes the backing instance
and the entry in the InstanceLifeCycle
- // Collection.
- //
- class ConversationalInstanceReaper implements Runnable {
- private Map<Object, InstanceLifeCycleWrapper>
instanceLifecycleCollection;
-
- public ConversationalInstanceReaper(Map<Object,
InstanceLifeCycleWrapper> aMap) {
- this.instanceLifecycleCollection = aMap;
- }
-
- public void run() {
- Iterator<Map.Entry<Object, InstanceLifeCycleWrapper>> anIterator =
- this.instanceLifecycleCollection.entrySet().iterator();
-
- while (anIterator.hasNext()) {
- Map.Entry<Object, InstanceLifeCycleWrapper> anEntry =
anIterator.next();
- InstanceLifeCycleWrapper anInstanceLifeCycleWrapper =
anEntry.getValue();
- if (anInstanceLifeCycleWrapper.isExpired()) {
- try {
- // cycle through all the references to this instance
and
- // remove them from the underlying wrappers collection
and
- // from the lifecycle wrappers collection
- for (Object conversationID :
anInstanceLifeCycleWrapper.callbackConversations) {
-
anInstanceLifeCycleWrapper.removeInstanceWrapper(conversationID);
-
this.instanceLifecycleCollection.remove(conversationID);
- }
-
- if (anInstanceLifeCycleWrapper.clientConversationId !=
null) {
- anInstanceLifeCycleWrapper
-
.removeInstanceWrapper(anInstanceLifeCycleWrapper.clientConversationId);
-
this.instanceLifecycleCollection.remove(anInstanceLifeCycleWrapper.clientConversationId);
- }
- } catch (Exception ex) {
- // TODO - what to do with any asynchronous exceptions?
- }
- }
- }
- }
- }
-
/**
- * @see
org.apache.tuscany.sca.core.conversation.ConversationListener#conversationEnded(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
- */
+ * @see
org.apache.tuscany.sca.core.conversation.ConversationListener#conversationEnded(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
+ */
public void conversationEnded(ExtendedConversation conversation) {
- //stopContext(conversation.getConversationID());
try {
remove(conversation.getConversationID());
} catch (Exception ex) {
@@ -386,28 +229,60 @@
}
/**
- * @see
org.apache.tuscany.sca.core.conversation.ConversationListener#conversationExpired(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
- */
+ * @see
org.apache.tuscany.sca.core.conversation.ConversationListener#conversationExpired(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
+ */
public void conversationExpired(ExtendedConversation conversation) {
+
+ Object conversationId = conversation.getConversationID();
+ InstanceLifeCycleWrapper ilcw =
instanceLifecycleCollection.get(conversationId);
+ if (ilcw != null)
+ {
+ // cycle through all the references to this instance and
+ // remove them from the underlying wrappers collection and
+ // from the lifecycle wrappers collection
+
+ for (Object conversationID : ilcw.callbackConversations) {
+ try{
+ ilcw.removeInstanceWrapper(conversationID);
+ remove(conversationID);
+ }
+ catch(TargetDestructionException tde){
+ System.out.println("Could not remove
conversation id " + conversationID);
+ }
+ }
+
+
+ if (ilcw.clientConversationId != null) {
+ try{
+
ilcw.removeInstanceWrapper(ilcw.clientConversationId);
+ remove(ilcw.clientConversationId);
+ }
+ catch(TargetDestructionException tde){
+ System.out.println("Could not remove
conversation id " + ilcw.clientConversationId);
+ }
+ }
+
+ }
+
}
/**
- * @see
org.apache.tuscany.sca.core.conversation.ConversationListener#conversationStarted(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
- */
+ * @see
org.apache.tuscany.sca.core.conversation.ConversationListener#conversationStarted(org.apache.tuscany.sca.core.conversation.ExtendedConversation)
+ */
public void conversationStarted(ExtendedConversation conversation) {
startContext(conversation.getConversationID());
}
/**
- * @return the conversationManager
- */
+ * @return the conversationManager
+ */
public ConversationManager getConversationManager() {
return conversationManager;
}
/**
- * @param conversationManager the conversationManager to set
- */
+ * @param conversationManager the conversationManager to set
+ */
public void setConversationManager(ConversationManager
conversationManager) {
this.conversationManager = conversationManager;
}
Modified:
incubator/tuscany/java/sca/modules/implementation-java/src/main/java/org/apache/tuscany/sca/implementation/java/introspect/impl/ConversationProcessor.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/implementation-java/src/main/java/org/apache/tuscany/sca/implementation/java/introspect/impl/ConversationProcessor.java?rev=620333&r1=620332&r2=620333&view=diff
==============================================================================
---
incubator/tuscany/java/sca/modules/implementation-java/src/main/java/org/apache/tuscany/sca/implementation/java/introspect/impl/ConversationProcessor.java
(original)
+++
incubator/tuscany/java/sca/modules/implementation-java/src/main/java/org/apache/tuscany/sca/implementation/java/introspect/impl/ConversationProcessor.java
Sun Feb 10 13:31:04 2008
@@ -26,6 +26,7 @@
import org.apache.tuscany.sca.implementation.java.JavaImplementation;
import org.osoa.sca.annotations.ConversationAttributes;
import org.osoa.sca.annotations.ConversationID;
+import org.osoa.sca.annotations.Conversational;
import org.osoa.sca.annotations.Scope;
/**
@@ -45,6 +46,7 @@
@Override
public <T> void visitClass(Class<T> clazz, JavaImplementation type) throws
IntrospectionException {
+
ConversationAttributes conversation =
clazz.getAnnotation(ConversationAttributes.class);
if (conversation == null) {
return;
@@ -53,10 +55,6 @@
if (scope == null) {
// implicitly assume conversation
type.setJavaScope(org.apache.tuscany.sca.implementation.java.impl.JavaScopeImpl.CONVERSATION);
- } else if (scope != null &&
!"CONVERSATION".equals(scope.value().toUpperCase())) {
- throw new InvalidConversationalImplementation(
- "Service is marked
with @ConversationAttributes but the scope is not @Scope(\"CONVERSATION\")"
- );
} else if (conversation != null) {
long maxAge;
long maxIdleTime;
Modified:
incubator/tuscany/java/sca/modules/implementation-java/src/test/java/org/apache/tuscany/sca/implementation/java/introspect/impl/ConversationProcessorTestCase.java
URL:
http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/implementation-java/src/test/java/org/apache/tuscany/sca/implementation/java/introspect/impl/ConversationProcessorTestCase.java?rev=620333&r1=620332&r2=620333&view=diff
==============================================================================
---
incubator/tuscany/java/sca/modules/implementation-java/src/test/java/org/apache/tuscany/sca/implementation/java/introspect/impl/ConversationProcessorTestCase.java
(original)
+++
incubator/tuscany/java/sca/modules/implementation-java/src/test/java/org/apache/tuscany/sca/implementation/java/introspect/impl/ConversationProcessorTestCase.java
Sun Feb 10 13:31:04 2008
@@ -58,6 +58,7 @@
assertEquals(org.apache.tuscany.sca.implementation.java.impl.JavaScopeImpl.CONVERSATION,
type.getJavaScope());
}
+ /* TUSCANY-1999 - apply conversation properties to all scopes
public void testBadFooScope() throws Exception {
JavaImplementation type =
javaImplementationFactory.createJavaImplementation();
try {
@@ -67,6 +68,7 @@
// expected
}
}
+ */
public void testBadFooBoth() throws Exception {
JavaImplementation type =
javaImplementationFactory.createJavaImplementation();
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]