Modified: incubator/tuscany/java/sca/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/impl/SCANodeImpl.java URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/impl/SCANodeImpl.java?rev=602979&r1=602978&r2=602979&view=diff ============================================================================== --- incubator/tuscany/java/sca/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/impl/SCANodeImpl.java (original) +++ incubator/tuscany/java/sca/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/impl/SCANodeImpl.java Mon Dec 10 09:52:35 2007 @@ -19,6 +19,9 @@ package org.apache.tuscany.sca.node.impl; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.StringBufferInputStream; import java.net.InetAddress; import java.net.ServerSocket; import java.net.URI; @@ -31,6 +34,10 @@ import java.util.logging.Logger; import javax.xml.namespace.QName; +import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamReader; +import javax.xml.stream.XMLStreamWriter; import org.apache.tuscany.sca.assembly.AssemblyFactory; import org.apache.tuscany.sca.assembly.Binding; @@ -41,15 +48,19 @@ import org.apache.tuscany.sca.assembly.Reference; import org.apache.tuscany.sca.assembly.Service; import org.apache.tuscany.sca.assembly.builder.CompositeBuilderException; +import org.apache.tuscany.sca.assembly.builder.impl.DomainWireBuilderImpl; import org.apache.tuscany.sca.assembly.xml.Constants; import org.apache.tuscany.sca.contribution.Contribution; import org.apache.tuscany.sca.contribution.DeployedArtifact; import org.apache.tuscany.sca.contribution.ModelFactoryExtensionPoint; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessor; +import org.apache.tuscany.sca.contribution.processor.StAXArtifactProcessorExtensionPoint; import org.apache.tuscany.sca.contribution.resolver.ModelResolver; import org.apache.tuscany.sca.contribution.resolver.impl.ModelResolverImpl; import org.apache.tuscany.sca.contribution.service.ContributionService; import org.apache.tuscany.sca.core.ExtensionPointRegistry; import org.apache.tuscany.sca.core.assembly.ActivationException; +import org.apache.tuscany.sca.databinding.impl.XSDDataTypeConverter.Base64Binary; import org.apache.tuscany.sca.domain.SCADomain; import org.apache.tuscany.sca.domain.SCADomainEventService; import org.apache.tuscany.sca.host.embedded.impl.ReallySmallRuntime; @@ -59,6 +70,8 @@ import org.apache.tuscany.sca.node.NodeFactoryImpl; import org.apache.tuscany.sca.node.SCANode; import org.apache.tuscany.sca.node.SCANodeSPI; +import org.apache.tuscany.sca.runtime.RuntimeComponent; +import org.apache.tuscany.sca.runtime.RuntimeComponentReference; /** * A local representation of the sca domain running on a single node @@ -100,16 +113,19 @@ // Used to pipe node information into the model NodeFactoryImpl nodeFactory; + // domain level wiring + DomainWireBuilderImpl domainWireBuilder = new DomainWireBuilderImpl(); + // methods defined on the implementation only /** * Creates a node connected to a wider domain. To find its place in the domain * node and domain identifiers must be provided. - * + * + * @param physicalNodeUri - if this is a url it is assumed that this will be used as root url for management components, e.g. http://localhost:8082 * @param domainUri - identifies what host and port the domain service is running on, e.g. http://localhost:8081 - * @param nodeUri - if this is a url it is assumed that this will be used as root url for management components, e.g. http://localhost:8082 - * @param nodeGroupURI the uri of the node group. This is the enpoint URI of the head of the + * @param logicalNodeURI the uri of the node group. This is the enpoint URI of the head of the * group of nodes. For example, in load balancing scenarios this will be the loaded balancer itself * @throws ActivationException */ @@ -125,9 +141,9 @@ * Creates a node connected to a wider domain and allows a classpath to be specified. * To find its place in the domain node and domain identifiers must be provided. * + * @param physicalNodeUri - if this is a url it is assumed that this will be used as root url for management components, e.g. http://localhost:8082 * @param domainUri - identifies what host and port the domain service is running on, e.g. http://localhost:8081 - * @param nodeUri - if this is a url it is assumed that this will be used as root url for management components, e.g. http://localhost:8082 - * @param nodeGroupURI the uri of the node group. This is the enpoint URI of the head of the + * @param logicalNodeURI the uri of the node group. This is the enpoint URI of the head of the * group of nodes. For example, in load balancing scenarios this will be the loaded balancer itself * @param cl - the ClassLoader to use for loading system resources for the node * @throws ActivationException @@ -214,9 +230,7 @@ } // temp methods to help integrate with existing code - - - + public Component getComponent(String componentName) { for (Composite composite: nodeComposite.getIncludes()) { for (Component component: composite.getComponents()) { @@ -265,6 +279,12 @@ if ((!nodeStarted) && (nodeComposite.getIncludes().size() > 0)){ startComposites(); nodeStarted = true; + + try { + scaDomain.registerNodeStart(nodeURI); + } catch (Exception ex) { + throw new NodeException(ex); + } } } @@ -272,6 +292,12 @@ if (nodeStarted){ stopComposites(); nodeStarted = false; + + try { + scaDomain.registerNodeStop(nodeURI); + } catch (Exception ex) { + throw new NodeException(ex); + } } } @@ -598,6 +624,21 @@ // don't try and restart the management composite // they will already have been started by the domain proxy if (!composite.getName().equals(nodeManagementCompositeName)){ + buildComposite(composite); + } + } + + // do cross composite wiring. This is here just in case + // the node has more than one composite and is stand alone + // If the node is not stand alone the domain will do this + if (domainURI == null){ + domainWireBuilder.wireDomain(nodeComposite); + } + + for (Composite composite : nodeComposite.getIncludes()) { + // don't try and restart the management composite + // they will already have been started by the domain proxy + if (!composite.getName().equals(nodeManagementCompositeName)){ startComposite(composite); } } @@ -607,21 +648,22 @@ throw new NodeException(ex); } } - - private void startComposite(Composite composite) throws CompositeBuilderException, ActivationException { - logger.log(Level.INFO, "Starting composite: " + composite.getName()); + + private void buildComposite(Composite composite) throws CompositeBuilderException, ActivationException { + logger.log(Level.INFO, "Building composite: " + composite.getName()); // Create the model for the composite nodeRuntime.getCompositeBuilder().build(composite); // activate the composite nodeRuntime.getCompositeActivator().activate(composite); + } + + private void startComposite(Composite composite) throws CompositeBuilderException, ActivationException { + logger.log(Level.INFO, "Starting composite: " + composite.getName()); // tell the domain where all the service endpoints are - scaDomain.registerRemoteServices(nodeURI, composite); - - // get up to date with where all services we depend on are - resolveRemoteReferences(composite); + registerRemoteServices(nodeURI, composite); //start the composite nodeRuntime.getCompositeActivator().start(composite); @@ -642,7 +684,6 @@ } catch (Exception ex) { throw new NodeException(ex); } - } private void stopComposite(Composite composite) @@ -651,98 +692,164 @@ nodeRuntime.getCompositeActivator().stop(composite); nodeRuntime.getCompositeActivator().deactivate(composite); } - - private void resolveRemoteReferences(Composite composite){ - // Loop through all reference binding URIs. Any that are not resolved - // should be looked up in the domain + private void registerRemoteServices(String nodeURI, Composite composite){ + // Loop through all service binding URIs registering them with the domain + for (Service service: composite.getServices()) { + for (Binding binding: service.getBindings()) { + registerRemoteServiceBinding(nodeURI, null, service, binding); + } + } - for (Reference reference: composite.getReferences()) { - for ( ComponentService service : reference.getTargets()){ + for (Component component: composite.getComponents()) { + for (ComponentService service: component.getServices()) { for (Binding binding: service.getBindings()) { - resolveRemoteReferenceBinding(reference, service, binding); + registerRemoteServiceBinding(nodeURI, component, service, binding); } - } - } - - for (Component component: composite.getComponents()) { - for (ComponentReference reference: component.getReferences()) { - for ( ComponentService service : reference.getTargets()){ - for (Binding binding: service.getBindings()) { - resolveRemoteReferenceBinding(reference, service, binding); - } - } } - } + } } - private void resolveRemoteReferenceBinding(Reference reference, Service service, Binding binding){ - if (binding.isUnresolved()) { - // find the right endpoint for this reference/binding. This relies on looking - // up every binding URI. If a response is returned then it's set back into the - // binding uri - String uri = ""; - - try { - uri = ((SCADomainEventService)scaDomain).findServiceEndpoint(domainURI, - service.getName(), - binding.getClass().getName()); - } catch(Exception ex) { - logger.log(Level.WARNING, - "Unable to find service: " + - domainURI + " " + - nodeURI + " " + - binding.getURI() + " " + - binding.getClass().getName() + " " + - uri); - } - - if (!uri.equals(SCADomainEventService.SERVICE_NOT_REGISTERED)){ - binding.setURI(uri); - } - } - } - - public void setReferenceEndpoint(String referenceName, String bindingClassName, String serviceURI) throws NodeException { - // find the named reference and binding and update the uri from this message - Reference reference = findReference(referenceName); - - if (reference != null){ - // find the matching binding and set it's uri - for( Binding binding : reference.getBindings()){ - if (binding.getClass().getName().equals(bindingClassName)){ - binding.setURI(serviceURI); + private void registerRemoteServiceBinding(String nodeURI, Component component, Service service, Binding binding ){ + if (service.getInterfaceContract().getInterface().isRemotable()) { + String uriString = binding.getURI(); + if (uriString != null) { + + + String serviceName = service.getName(); + + if (component != null){ + serviceName = component.getName() + '/' + serviceName; + } + + try { + scaDomain.registerServiceEndpoint(domainURI, + nodeURI, + serviceName, + binding.getClass().getName(), + uriString); + } catch(Exception ex) { + logger.log(Level.WARNING, + "Unable to register service: " + + domainURI + " " + + nodeURI + " " + + service.getName()+ " " + + binding.getClass().getName() + " " + + uriString); } } } - } + } - private Reference findReference(String referenceName){ + public void updateComposite(QName compositeQName, String compositeXMLBase64 ) throws NodeException { + logger.log(Level.INFO, "Updating composite " + compositeQName.toString() + + " at node " + nodeURI); + + ByteArrayInputStream bais = new ByteArrayInputStream(Base64Binary.decode(compositeXMLBase64)); - for (Reference reference: nodeComposite.getReferences()) { - if (reference.getName().equals(referenceName)){ - return reference; - } - } + // find the composite that will be updated + Composite composite = composites.get(compositeQName); - for (Component component: nodeComposite.getComponents()) { - for (ComponentReference reference: component.getReferences()) { - if (reference.getName().equals(referenceName)){ - return reference; - } - } + if (composite == null) { + throw new NodeException("trying to update composite " + compositeQName.toString() + + " which can't be found in node " + nodeURI); } - for (Composite composite : nodeComposite.getIncludes()) { - for (Component component: composite.getComponents()) { - for (ComponentReference reference: component.getReferences()) { - if (reference.getName().equals(referenceName)){ - return reference; + // parse the XML into an composite object + Composite newComposite = null; + + ExtensionPointRegistry registry = nodeRuntime.getExtensionPointRegistry(); + StAXArtifactProcessorExtensionPoint staxProcessors = + registry.getExtensionPoint(StAXArtifactProcessorExtensionPoint.class); + + StAXArtifactProcessor<Composite> processor = staxProcessors.getProcessor(Composite.class); + + try { + XMLInputFactory inputFactory = XMLInputFactory.newInstance(); + XMLStreamReader reader = inputFactory.createXMLStreamReader(bais); + newComposite = processor.read(reader); + reader.close(); + } catch (Exception ex) { + throw new NodeException(ex); + } + + // for each component in the composite compare it against the live component + for (Component newComponent : newComposite.getComponents()){ + for (Component component : composite.getComponents()){ + if (component.getName().equals(newComponent.getName())){ + // compare the component references + for (Reference newReference : newComponent.getReferences()){ + for (Reference reference : component.getReferences()) { + if (reference.getName().equals(newReference.getName())) { + boolean referenceChanged = false; + Binding removalCandidates[] = new Binding[reference.getBindings().size()]; + int bindingIndex = 0; + for (Binding binding : reference.getBindings()){ + removalCandidates[bindingIndex] = binding; + bindingIndex++; + } + + for (Binding newBinding : newReference.getBindings()){ + boolean bindingFound = false; + bindingIndex = 0; + for (Binding binding : reference.getBindings()){ + if (binding.getName().equals(newBinding.getName())){ + if ((binding.getURI() != null) && + (newBinding.getURI() != null) && + !binding.getURI().equals(newBinding.getURI())){ + binding.setURI(newBinding.getURI()); + referenceChanged = true; + + logger.log(Level.INFO, "Updating component " + + component.getName() + + " reference " + + reference.getName() + + " binding " + + binding.getClass().getName() + + " URI " + + binding.getURI()); + } + bindingFound = true; + removalCandidates[bindingIndex] = null; + } + bindingIndex++; + } + + // if the new binding is not currently deployed then add it + if (bindingFound == false){ + reference.getBindings().add(newBinding); + referenceChanged = true; + } + } + + // remove all of the old bindings + for ( int i = 0; i < removalCandidates.length; i++){ + if (removalCandidates[i] != null){ + reference.getBindings().remove(removalCandidates[i]); + referenceChanged = true; + } + } + + // if the node is running restart the reference + if (referenceChanged && nodeStarted){ + nodeRuntime.getCompositeActivator().deactivate((RuntimeComponent)component, + (RuntimeComponentReference)reference); + + nodeRuntime.getCompositeActivator().start((RuntimeComponent)component, + (RuntimeComponentReference)reference); + + } + } + } } + + // TODO - compare other parts of the component } } - } + } + + // TODO - Compare other parts of the composite? + + } - return null; - } }
Modified: incubator/tuscany/java/sca/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/management/impl/SCANodeManagerServiceImpl.java URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/management/impl/SCANodeManagerServiceImpl.java?rev=602979&r1=602978&r2=602979&view=diff ============================================================================== --- incubator/tuscany/java/sca/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/management/impl/SCANodeManagerServiceImpl.java (original) +++ incubator/tuscany/java/sca/modules/node-impl/src/main/java/org/apache/tuscany/sca/node/management/impl/SCANodeManagerServiceImpl.java Mon Dec 10 09:52:35 2007 @@ -96,8 +96,8 @@ node.destroy(); } - public void setReferenceEndpoint(String refererenceName, String bindingName, String serviceURI) throws NodeException { - node.setReferenceEndpoint(refererenceName, bindingName, serviceURI); + public void updateComposite(String compositeQName, String compositeXMLBase64 ) throws NodeException { + ((SCANodeImpl)node).updateComposite(QName.valueOf(compositeQName), compositeXMLBase64 ); } // ComponentManagerService Modified: incubator/tuscany/java/sca/modules/node/src/main/java/org/apache/tuscany/sca/node/management/SCANodeManagerService.java URL: http://svn.apache.org/viewvc/incubator/tuscany/java/sca/modules/node/src/main/java/org/apache/tuscany/sca/node/management/SCANodeManagerService.java?rev=602979&r1=602978&r2=602979&view=diff ============================================================================== --- incubator/tuscany/java/sca/modules/node/src/main/java/org/apache/tuscany/sca/node/management/SCANodeManagerService.java (original) +++ incubator/tuscany/java/sca/modules/node/src/main/java/org/apache/tuscany/sca/node/management/SCANodeManagerService.java Mon Dec 10 09:52:35 2007 @@ -86,12 +86,11 @@ public void destroyNode() throws NodeException; /** - * Set a reference endpoint based on information registered with the domain - * - * @param refererenceName the name of the reference in question - * @param bindingClassName the class name of the binding on the reference - * @param serviceURI the uri to set + * Pass in an updated version of a composite in base64 encoded XML form. The node will compare + * this against the version of the composite it has and apply and changes found + * @param compositeQName + * @param compositeXMLBase64 * @throws NodeException */ - public void setReferenceEndpoint(String refererenceName, String bindingClassName, String serviceURI) throws NodeException; + public void updateComposite(String compositeQName, String compositeXMLBase64 ) throws NodeException; } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
