Modified: incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java?rev=575101&r1=575100&r2=575101&view=diff ============================================================================== --- incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java (original) +++ incubator/tuscany/java/sca/modules/core/src/main/java/org/apache/tuscany/sca/core/scope/ConversationalScopeContainer.java Wed Sep 12 15:09:19 2007 @@ -17,11 +17,9 @@ * under the License. */ - package org.apache.tuscany.sca.core.scope; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -31,82 +29,85 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.tuscany.sca.core.context.ConversationImpl; import org.apache.tuscany.sca.core.context.InstanceWrapper; +import org.apache.tuscany.sca.core.conversation.ConversationListener; +import org.apache.tuscany.sca.core.conversation.ConversationManager; +import org.apache.tuscany.sca.core.conversation.ExtendedConversation; import org.apache.tuscany.sca.core.invocation.ThreadMessageContext; import org.apache.tuscany.sca.invocation.Message; import org.apache.tuscany.sca.runtime.RuntimeComponent; import org.apache.tuscany.sca.store.Store; -import org.osoa.sca.Conversation; import org.osoa.sca.ConversationEndedException; /** * A scope context which manages atomic component instances keyed on ConversationID * */ -public class ConversationalScopeContainer extends AbstractScopeContainer<Object> { - - private Map<Object, InstanceLifeCycleWrapper> instanceLifecycleCollection = new ConcurrentHashMap<Object, InstanceLifeCycleWrapper>(); - +public class ConversationalScopeContainer extends AbstractScopeContainer<Object> implements ConversationListener { + private ConversationManager conversationManager; + private Map<Object, InstanceLifeCycleWrapper> instanceLifecycleCollection = + new ConcurrentHashMap<Object, InstanceLifeCycleWrapper>(); + //TODO: This needs to observe the value set by ConversationalAttributes for now we will hard code it. - private long max_age = 60 * 60 * 1000; // 1 hour; - private long max_idle_time = 60 * 60 * 1000; // 1 hour; - private long reaper_interval = 60; // every minute; + private long max_age = 60 * 60 * 1000; // 1 hour; + private long max_idle_time = 60 * 60 * 1000; // 1 hour; + private long reaper_interval = 60; // every minute; private ScheduledExecutorService scheduler; - + public ConversationalScopeContainer(Store aStore, RuntimeComponent component) { - super(Scope.CONVERSATION, component); - + super(Scope.CONVERSATION, component); + // Note: aStore is here to preserve the original factory interface. It is not currently used in this // implementation since we do not support instance persistence. - + // Check System properties to see if timeout values have been specified. All timeout values // will be specified in seconds. // - String aProperty; + String aProperty; aProperty = System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.MaxIdleTime"); - if (aProperty != null) - try - { - max_idle_time = (new Long(aProperty) * 1000); - } - catch (NumberFormatException nfe) {}; - + if (aProperty != null) { + try { + max_idle_time = (new Long(aProperty) * 1000); + } catch (NumberFormatException nfe) { + // Ignore + } + } + aProperty = System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.MaxAge"); - if (aProperty != null) - try - { - max_age = (new Long(aProperty) * 1000); - } - catch (NumberFormatException nfe) {}; - + if (aProperty != null) { + try { + max_age = (new Long(aProperty) * 1000); + } catch (NumberFormatException nfe) { + // Ignore + } + } + aProperty = System.getProperty("org.apache.tuscany.sca.core.scope.ConversationalScopeContainer.ReaperInterval"); - if (aProperty != null) - try - { - reaper_interval = new Long(aProperty); - } - catch (NumberFormatException nfe) {}; - - + if (aProperty != null) { + try { + reaper_interval = new Long(aProperty); + } catch (NumberFormatException nfe) { + // Ignore + } + } + // Check to see if the maxAge and/or maxIdleTime have been specified using @ConversationAttributes. // Implementation annoated attributes are honored first. - if (this.getComponent().getImplementationProvider() instanceof ScopedImplementationProvider) - { - ScopedImplementationProvider aScopedImpl = (ScopedImplementationProvider) this.getComponent().getImplementationProvider(); - + if (this.getComponent().getImplementationProvider() instanceof ScopedImplementationProvider) { + ScopedImplementationProvider aScopedImpl = + (ScopedImplementationProvider)this.getComponent().getImplementationProvider(); + long maxAge = aScopedImpl.getMaxAge(); if (maxAge > 0) { max_age = maxAge; } long maxIdleTime = aScopedImpl.getMaxIdleTime(); - if (maxIdleTime > 0 ) { + if (maxIdleTime > 0) { max_idle_time = maxIdleTime; } - } + } - } - + } @Override public synchronized void start() { @@ -116,65 +117,66 @@ // Get a scheduler and scheduled a task to be run in the future indefinitely until its explicitly shutdown. this.scheduler = Executors.newSingleThreadScheduledExecutor(); - scheduler.scheduleAtFixedRate(new ConversationalInstanceReaper(this.instanceLifecycleCollection), 3, reaper_interval, TimeUnit.SECONDS); - - lifecycleState = RUNNING; - } + scheduler.scheduleAtFixedRate(new ConversationalInstanceReaper(this.instanceLifecycleCollection), + 3, + reaper_interval, + TimeUnit.SECONDS); + + lifecycleState = RUNNING; + } @Override public synchronized void stop() { - + // Prevent the scheduler from submitting any additional reapers, initiate an orderly shutdown if a reaper task is in progress. - if (this.scheduler != null) - this.scheduler.shutdown(); - + if (this.scheduler != null) + this.scheduler.shutdown(); + lifecycleState = STOPPED; } - protected InstanceWrapper getInstanceWrapper(boolean create,Object contextId) throws TargetResolutionException { - + protected InstanceWrapper getInstanceWrapper(boolean create, Object contextId) throws TargetResolutionException { + // we might get a null context if the target service has // conversational scope but only its callback interface // is conversational. In this case we need to invent a // conversation Id here to store the service against // and populate the thread context - if (contextId == null){ + if (contextId == null) { contextId = UUID.randomUUID().toString(); Message msgContext = ThreadMessageContext.getMessageContext(); - - if (msgContext != null){ + + if (msgContext != null) { msgContext.getTo().getReferenceParameters().setConversationID(contextId); } } - + InstanceLifeCycleWrapper anInstanceWrapper = this.instanceLifecycleCollection.get(contextId); - + if (anInstanceWrapper == null && !create) - return null; - - if (anInstanceWrapper == null) - { - anInstanceWrapper = new InstanceLifeCycleWrapper(contextId); + return null; + + if (anInstanceWrapper == null) { + anInstanceWrapper = new InstanceLifeCycleWrapper(contextId); this.instanceLifecycleCollection.put(contextId, anInstanceWrapper); } // If an existing instance is found return it only if its not expired and update its // last referenced time. - else - { - if (anInstanceWrapper.isExpired()) - throw new ConversationEndedException(); - anInstanceWrapper.updateLastReferencedTime(); - } - - return anInstanceWrapper.getInstanceWrapper(contextId); - + else { + if (anInstanceWrapper.isExpired()) + throw new ConversationEndedException(); + anInstanceWrapper.updateLastReferencedTime(); + } + + return anInstanceWrapper.getInstanceWrapper(contextId); + } - + @Override public InstanceWrapper getWrapper(Object contextId) throws TargetResolutionException { - return getInstanceWrapper(true,contextId); - } - + return getInstanceWrapper(true, contextId); + } + /** * This method allows a new context id to be registered alongside an existing one. This happens in * one case, when a conversation includes a stateful callback. The client component instance @@ -186,20 +188,18 @@ * and reset when the component instance is removed */ public void addWrapperReference(Object existingContextId, Object contextId) throws TargetResolutionException { - Conversation conversation = (Conversation)contextId; - // get the instance wrapper via the existing id InstanceLifeCycleWrapper existingInstanceWrapper = this.instanceLifecycleCollection.get(existingContextId); - InstanceLifeCycleWrapper newInstanceWrapper = this.instanceLifecycleCollection.get(conversation.getConversationID()); - + InstanceLifeCycleWrapper newInstanceWrapper = this.instanceLifecycleCollection.get(contextId); + // only add the extra reference once if (newInstanceWrapper == null) { // add the id to the list of ids that the wrapper holds. Used for reference // counting and conversation resetting on destruction. - existingInstanceWrapper.addCallbackConversation(conversation); - + existingInstanceWrapper.addCallbackConversation(contextId); + // add the reference to the collection - this.instanceLifecycleCollection.put(conversation.getConversationID(), existingInstanceWrapper); + this.instanceLifecycleCollection.put(contextId, existingInstanceWrapper); } } @@ -214,187 +214,192 @@ } } - anInstanceWrapper = new InstanceLifeCycleWrapper(wrapper, contextId); + anInstanceWrapper = new InstanceLifeCycleWrapper(wrapper, contextId); this.instanceLifecycleCollection.put(contextId, anInstanceWrapper); } - - + // The remove is invoked when a conversation is explicitly ended. This can occur by using the @EndsConversation or API. // In this case the instance is immediately removed. A new conversation will be started on the next operation // associated with this conversationId's service reference. // @Override public void remove(Object contextId) throws TargetDestructionException { - if (contextId != null){ - if (this.instanceLifecycleCollection.containsKey(contextId)) - { - InstanceLifeCycleWrapper anInstanceLifeCycleWrapper = this.instanceLifecycleCollection.get(contextId); - this.instanceLifecycleCollection.remove(contextId); - anInstanceLifeCycleWrapper.removeInstanceWrapper(contextId); - } - } - } - - + if (contextId != null) { + if (this.instanceLifecycleCollection.containsKey(contextId)) { + InstanceLifeCycleWrapper anInstanceLifeCycleWrapper = this.instanceLifecycleCollection.get(contextId); + this.instanceLifecycleCollection.remove(contextId); + anInstanceLifeCycleWrapper.removeInstanceWrapper(contextId); + } + } + } + /* * This is an inner class that keeps track of the lifecycle of a conversation scoped * implementation instance. * */ - - private class InstanceLifeCycleWrapper - { + + private class InstanceLifeCycleWrapper { private Object clientConversationId; - private List<ConversationImpl> callbackConversations = new ArrayList<ConversationImpl>(); - private long creationTime; - private long lastReferencedTime; - private long expirationInterval; - private long maxIdleTime; - private Conversation conversation; - - private InstanceLifeCycleWrapper(Object contextId) throws TargetResolutionException - { - this.clientConversationId = contextId; - this.creationTime = System.currentTimeMillis(); - this.lastReferencedTime = this.creationTime; - this.expirationInterval = max_age; - this.maxIdleTime = max_idle_time; - this.createInstance(contextId); - } - - private InstanceLifeCycleWrapper(InstanceWrapper wrapper, Object contextId) throws TargetResolutionException - { - this.clientConversationId = contextId; - this.creationTime = System.currentTimeMillis(); - this.lastReferencedTime = this.creationTime; - this.expirationInterval = max_age; - this.maxIdleTime = max_idle_time; - wrappers.put(contextId, wrapper); - } - - private boolean isExpired() - { - long currentTime = System.currentTimeMillis(); - if ((this.lastReferencedTime + this.maxIdleTime) < currentTime) // max idle time exceeded - return true; - if ((this.creationTime + this.expirationInterval) < currentTime) // max time to live exceeded - return true; - - return false; - } - - private void updateLastReferencedTime() - { - this.lastReferencedTime = System.currentTimeMillis(); + private List<Object> callbackConversations = new ArrayList<Object>(); + private long creationTime; + private long lastReferencedTime; + private long expirationInterval; + private long maxIdleTime; + + private InstanceLifeCycleWrapper(Object contextId) throws TargetResolutionException { + this.clientConversationId = contextId; + this.creationTime = System.currentTimeMillis(); + this.lastReferencedTime = this.creationTime; + this.expirationInterval = max_age; + this.maxIdleTime = max_idle_time; + this.createInstance(contextId); + } + + private InstanceLifeCycleWrapper(InstanceWrapper wrapper, Object contextId) throws TargetResolutionException { + this.clientConversationId = contextId; + this.creationTime = System.currentTimeMillis(); + this.lastReferencedTime = this.creationTime; + this.expirationInterval = max_age; + this.maxIdleTime = max_idle_time; + wrappers.put(contextId, wrapper); + } + + private boolean isExpired() { + long currentTime = System.currentTimeMillis(); + if ((this.lastReferencedTime + this.maxIdleTime) < currentTime) // max idle time exceeded + return true; + if ((this.creationTime + this.expirationInterval) < currentTime) // max time to live exceeded + return true; + + return false; } - + + private void updateLastReferencedTime() { + this.lastReferencedTime = System.currentTimeMillis(); + } + // Associates a callback conversation with this instance. Each time the scope container // is asked to remove an object given a ontextId an associated conversation object will // have its conversationId reset to null. When the list of ids is empty the component instance // will be removed from the scope container - private void addCallbackConversation(Conversation conversation){ - InstanceWrapper ctx = getInstanceWrapper(clientConversationId); - callbackConversations.add((ConversationImpl)conversation); - wrappers.put(conversation.getConversationID(), ctx); + private void addCallbackConversation(Object conversationID) { + InstanceWrapper ctx = getInstanceWrapper(clientConversationId); + callbackConversations.add(conversationID); + wrappers.put(conversationID, ctx); } - + // // Return the backing implementation instance // - private InstanceWrapper getInstanceWrapper(Object contextId) - { - InstanceWrapper ctx = wrappers.get(contextId); - return ctx; - } - - private void removeInstanceWrapper(Object contextId) throws TargetDestructionException - { - InstanceWrapper ctx = getInstanceWrapper(contextId); - wrappers.remove(contextId); - - // find out if we are dealing with the original client conversation id - // and reset accordingly - if (clientConversationId.equals(contextId)){ - clientConversationId = null; - } else { - // reset the conversationId in the conversation object if present - // so that and ending callback causes the conversation in the originating - // service reference in the client to be reset - ConversationImpl conversation = null; - - for (ConversationImpl loopConversation : callbackConversations) { - if (loopConversation.getConversationID().equals(contextId)) { - conversation = loopConversation; - } - } - if(conversation != null){ - conversation.setConversationID(null); - callbackConversations.remove(conversation); - } - } - - - // stop the component if this removes the last reference - if (clientConversationId == null && - callbackConversations.isEmpty()) { - ctx.stop(); - if (conversation != null) { - ((ConversationImpl)conversation).setConversationID(null); - } - } - } - - private void createInstance(Object contextId) throws TargetResolutionException - { + private InstanceWrapper getInstanceWrapper(Object contextId) { + InstanceWrapper ctx = wrappers.get(contextId); + return ctx; + } + + private void removeInstanceWrapper(Object contextId) throws TargetDestructionException { + InstanceWrapper ctx = getInstanceWrapper(contextId); + wrappers.remove(contextId); + + // find out if we are dealing with the original client conversation id + // and reset accordingly + if (clientConversationId.equals(contextId)) { + clientConversationId = null; + } else { + // reset the conversationId in the conversation object if present + // so that and ending callback causes the conversation in the originating + // service reference in the client to be reset + callbackConversations.remove(contextId); + } + + // stop the component if this removes the last reference + if (clientConversationId == null && callbackConversations.isEmpty()) { + ctx.stop(); + } + } + + private void createInstance(Object contextId) throws TargetResolutionException { InstanceWrapper instanceWrapper = createInstanceWrapper(); instanceWrapper.start(); - wrappers.put(contextId, instanceWrapper); + wrappers.put(contextId, instanceWrapper); } - + } - + // // This inner class is an instance reaper. It periodically iterates over the InstanceLifeCycleCollection // and for any instances that have expired removes the backing instance and the entry in the InstanceLifeCycle // Collection. // - class ConversationalInstanceReaper implements Runnable - { + class ConversationalInstanceReaper implements Runnable { private Map<Object, InstanceLifeCycleWrapper> instanceLifecycleCollection; - - public ConversationalInstanceReaper(Map<Object, InstanceLifeCycleWrapper> aMap) - { + + public ConversationalInstanceReaper(Map<Object, InstanceLifeCycleWrapper> aMap) { this.instanceLifecycleCollection = aMap; } - - public void run() - { - Iterator<Map.Entry<Object,InstanceLifeCycleWrapper>> anIterator = this.instanceLifecycleCollection.entrySet().iterator(); - - while (anIterator.hasNext()) - { - Map.Entry<Object,InstanceLifeCycleWrapper> anEntry = anIterator.next(); + + public void run() { + Iterator<Map.Entry<Object, InstanceLifeCycleWrapper>> anIterator = + this.instanceLifecycleCollection.entrySet().iterator(); + + while (anIterator.hasNext()) { + Map.Entry<Object, InstanceLifeCycleWrapper> anEntry = anIterator.next(); InstanceLifeCycleWrapper anInstanceLifeCycleWrapper = anEntry.getValue(); - if (anInstanceLifeCycleWrapper.isExpired()) - { + if (anInstanceLifeCycleWrapper.isExpired()) { try { // cycle through all the references to this instance and // remove them from the underlying wrappers collection and // from the lifecycle wrappers collection - for (ConversationImpl conversation : anInstanceLifeCycleWrapper.callbackConversations) { - anInstanceLifeCycleWrapper.removeInstanceWrapper(conversation.getConversationID()); - this.instanceLifecycleCollection.remove(conversation.getConversationID()); + for (Object conversationID : anInstanceLifeCycleWrapper.callbackConversations) { + anInstanceLifeCycleWrapper.removeInstanceWrapper(conversationID); + this.instanceLifecycleCollection.remove(conversationID); } - - if (anInstanceLifeCycleWrapper.clientConversationId != null){ - anInstanceLifeCycleWrapper.removeInstanceWrapper(anInstanceLifeCycleWrapper.clientConversationId); + + if (anInstanceLifeCycleWrapper.clientConversationId != null) { + anInstanceLifeCycleWrapper + .removeInstanceWrapper(anInstanceLifeCycleWrapper.clientConversationId); this.instanceLifecycleCollection.remove(anInstanceLifeCycleWrapper.clientConversationId); } } catch (Exception ex) { - // TODO - what to do with any asynchronous exceptions? + // TODO - what to do with any asynchronous exceptions? } } - } + } } } + + /** + * @see org.apache.tuscany.sca.core.conversation.ConversationListener#conversationEnded(org.apache.tuscany.sca.core.conversation.ExtendedConversation) + */ + public void conversationEnded(ExtendedConversation conversation) { + stopContext(conversation.getConversationID()); + } + + /** + * @see org.apache.tuscany.sca.core.conversation.ConversationListener#conversationExpired(org.apache.tuscany.sca.core.conversation.ExtendedConversation) + */ + public void conversationExpired(ExtendedConversation conversation) { + } + + /** + * @see org.apache.tuscany.sca.core.conversation.ConversationListener#conversationStarted(org.apache.tuscany.sca.core.conversation.ExtendedConversation) + */ + public void conversationStarted(ExtendedConversation conversation) { + startContext(conversation.getConversationID()); + } + + /** + * @return the conversationManager + */ + public ConversationManager getConversationManager() { + return conversationManager; + } + + /** + * @param conversationManager the conversationManager to set + */ + public void setConversationManager(ConversationManager conversationManager) { + this.conversationManager = conversationManager; + } + }
Modified: incubator/tuscany/java/sca/modules/host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java?rev=575101&r1=575100&r2=575101&view=diff ============================================================================== --- incubator/tuscany/java/sca/modules/host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java (original) +++ incubator/tuscany/java/sca/modules/host-embedded/src/main/java/org/apache/tuscany/sca/host/embedded/impl/ReallySmallRuntimeBuilder.java Wed Sep 12 15:09:19 2007 @@ -61,6 +61,8 @@ import org.apache.tuscany.sca.core.assembly.ActivationException; import org.apache.tuscany.sca.core.assembly.CompositeActivator; import org.apache.tuscany.sca.core.assembly.CompositeActivatorImpl; +import org.apache.tuscany.sca.core.conversation.ConversationManager; +import org.apache.tuscany.sca.core.conversation.ConversationManagerImpl; import org.apache.tuscany.sca.core.invocation.DefaultProxyFactoryExtensionPoint; import org.apache.tuscany.sca.core.invocation.ExtensibleWireProcessor; import org.apache.tuscany.sca.core.invocation.ProxyFactory; @@ -127,11 +129,14 @@ RequestContextFactory requestContextFactory = registry.getExtensionPoint(ContextFactoryExtensionPoint.class).getFactory(RequestContextFactory.class); + ConversationManager conversationManager = new ConversationManagerImpl(); + registry.addExtensionPoint(conversationManager); + // Create the composite activator CompositeActivator compositeActivator = new CompositeActivatorImpl(assemblyFactory, messageFactory, javaInterfaceFactory, scaBindingFactory, mapper, scopeRegistry, workScheduler, wireProcessor, requestContextFactory, - proxyFactory, providerFactories, processors); + proxyFactory, providerFactories, processors, conversationManager); return compositeActivator; } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
