Author: rajith
Date: Mon Jul 23 17:35:26 2007
New Revision: 558903

URL: http://svn.apache.org/viewvc?view=rev&rev=558903
Log:
adding synapse exchange

Added:
    
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/
    
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java
    
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java
    
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java
    
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java
Modified:
    incubator/qpid/branches/client_restructure/java/broker/pom.xml
    
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
    
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
    
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
    
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
    
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
    
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
    
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
    
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
    
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
    
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java

Modified: incubator/qpid/branches/client_restructure/java/broker/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/pom.xml?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- incubator/qpid/branches/client_restructure/java/broker/pom.xml (original)
+++ incubator/qpid/branches/client_restructure/java/broker/pom.xml Mon Jul 23 
17:35:26 2007
@@ -34,14 +34,89 @@
 
     <properties>
         <topDirectoryLocation>..</topDirectoryLocation>
+        <!-- Synapse and related components -->
+        <synapse.version>1.0</synapse.version>
+        <stax.api.version>1.0.1</stax.api.version>
+        <activation.version>1.1</activation.version>
+   
+        <!-- Axis2 1.2 and its dependencies -->
+        <axis2.version>1.2</axis2.version>
+        <axiom.version>1.2.4</axiom.version>
+        <xml_schema.version>1.3.1</xml_schema.version>
+        <xml_apis.version>1.3.03</xml_apis.version>
     </properties>
 
     <dependencies>
+        
+        <dependency>
+            <groupId>org.apache.axis2</groupId>
+            <artifactId>axis2-kernel</artifactId>
+            <version>${axis2.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.synapse</groupId>
+            <artifactId>synapse-core</artifactId>
+            <version>${synapse.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.synapse</groupId>
+            <artifactId>synapse-extensions</artifactId>
+            <version>${synapse.version}</version>
+        </dependency>
+            
+        <dependency>
+            <groupId>org.apache.ws.commons.axiom</groupId>
+            <artifactId>axiom-api</artifactId>
+            <version>${axiom.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ws.commons.axiom</groupId>
+            <artifactId>axiom-impl</artifactId>
+            <version>${axiom.version}</version>
+        </dependency>
+      <!--  <dependency>
+            <groupId>org.apache.ws.commons.axiom</groupId>
+            <artifactId>axiom-dom</artifactId>
+            <version>${axiom.version}</version>
+        </dependency>
+     -->
+
+        <dependency>
+            <groupId>org.apache.ws.commons.schema</groupId>
+            <artifactId>XmlSchema</artifactId>
+            <version>${xml_schema.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>xml-apis</groupId>
+            <artifactId>xml-apis</artifactId>
+            <version>${xml_apis.version}</version>
+        </dependency>
+        
+        <dependency>
+          <groupId>org.codehaus.woodstox</groupId>
+          <artifactId>wstx-asl</artifactId>
+           <version>3.2.1</version>
+       </dependency>
+
+        <dependency>
+            <groupId>stax</groupId>
+            <artifactId>stax-api</artifactId>
+            <version>${stax.api.version}</version>
+        </dependency>       
+ 
+        <dependency>
+            <groupId>javax.activation</groupId>
+            <artifactId>activation</artifactId>
+            <version>${activation.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.qpid</groupId>
             <artifactId>qpid-common</artifactId>
-        </dependency>
+        </dependency>        
 
         <dependency>
             <groupId>commons-cli</groupId>

Modified: 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
 Mon Jul 23 17:35:26 2007
@@ -94,7 +94,7 @@
                 Exchange exchange = _exchangeRegistry.getExchange(new 
AMQShortString(exchangeName));
                 if (exchange == null)
                 {
-                    exchange = _exchangeFactory.createExchange(new 
AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0);
+                    exchange = 
_exchangeFactory.createExchange(_exchangeRegistry,new 
AMQShortString(exchangeName), new AMQShortString(type), durable, autoDelete, 0);
                     _exchangeRegistry.registerExchange(exchange);
                 }
                 else

Modified: 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
 Mon Jul 23 17:35:26 2007
@@ -126,7 +126,7 @@
      */
     protected abstract ExchangeMBean createMBean() throws AMQException;
 
-    public void initialise(VirtualHost host, AMQShortString name, boolean 
durable, int ticket, boolean autoDelete) throws AMQException
+       public void initialise(VirtualHost host, AMQShortString name, boolean 
durable, int ticket, boolean autoDelete, ExchangeRegistry exchangeRegistry) 
throws AMQException
     {
         _virtualHost = host;
         _name = name;
@@ -134,7 +134,10 @@
         _autoDelete = autoDelete;
         _ticket = ticket;
         _exchangeMbean = createMBean();
-        _exchangeMbean.register();
+        if(_exchangeMbean != null)
+        {
+               _exchangeMbean.register();
+        }
     }
 
     public boolean isDurable()

Modified: 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
 Mon Jul 23 17:35:26 2007
@@ -20,19 +20,15 @@
  */
 package org.apache.qpid.server.exchange;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class DefaultExchangeFactory implements ExchangeFactory
 {
@@ -48,9 +44,12 @@
         _exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, 
org.apache.qpid.server.exchange.DestWildExchange.class);
         _exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, 
org.apache.qpid.server.exchange.HeadersExchange.class);
         _exchangeClassMap.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, 
org.apache.qpid.server.exchange.FanoutExchange.class);
+        
+        // I'd rather allow an extention mechanism to register custom 
exchanges. for standard default exchanges this is fine.
+        _exchangeClassMap.put(new AMQShortString("synapse"), 
org.apache.qpid.server.exchange.synapse.SynapseExchange.class);
     }
 
-    public Exchange createExchange(AMQShortString exchange, AMQShortString 
type, boolean durable, boolean autoDelete,
+    public Exchange createExchange(ExchangeRegistry 
exchangeRegistry,AMQShortString exchange, AMQShortString type, boolean durable, 
boolean autoDelete,
                                    int ticket)
             throws AMQException
     {
@@ -62,7 +61,7 @@
         try
         {
             Exchange e = exchClass.newInstance();
-            e.initialise(_host, exchange, durable, ticket, autoDelete);
+            e.initialise(_host, exchange, durable, ticket, autoDelete, 
exchangeRegistry);
             return e;
         }
         catch (InstantiationException e)

Modified: 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
 Mon Jul 23 17:35:26 2007
@@ -32,7 +32,7 @@
     AMQShortString getName();
     AMQShortString getType();
 
-    void initialise(VirtualHost host, AMQShortString name, boolean durable, 
int ticket, boolean autoDelete) throws AMQException;
+    void initialise(VirtualHost host, AMQShortString name, boolean durable, 
int ticket, boolean autoDelete, ExchangeRegistry exchangeRegistry) throws 
AMQException;
 
     boolean isDurable();
 

Modified: 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
 Mon Jul 23 17:35:26 2007
@@ -26,7 +26,7 @@
 
 public interface ExchangeFactory
 {
-    Exchange createExchange(AMQShortString exchange, AMQShortString type, 
boolean durable, boolean autoDelete,
+    Exchange createExchange(ExchangeRegistry exchangeRegistry, AMQShortString 
exchange, AMQShortString type, boolean durable, boolean autoDelete,
                             int ticket)
             throws AMQException;
 }

Added: 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java?view=auto&rev=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java
 (added)
+++ 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/MessageContextCreatorForQpid.java
 Mon Jul 23 17:35:26 2007
@@ -0,0 +1,258 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.exchange.synapse;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
+
+import javax.activation.DataHandler;
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.axiom.attachments.ByteArrayDataSource;
+import org.apache.axiom.om.OMDocument;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMText;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.axiom.om.util.StAXUtils;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.RelatesTo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+
+/**
+ * The MessageContext needs to be set up and then is used by the 
SynapseMessageReceiver to inject messages.
+ * This class is used by the SynapseMessageReceiver to find the environment. 
The env is stored in a Parameter to the Axis2 config
+ */
+public class MessageContextCreatorForQpid{
+
+    private static Log log = 
LogFactory.getLog(MessageContextCreatorForQpid.class);
+
+    private static SynapseConfiguration synCfg = null;
+    private static SynapseEnvironment   synEnv = null;
+
+    final static String ORIGINAL_MESSAGE = "ORIGINAL_MESSAGE";
+    final static String AMQP_CONTENT_TYPE = "AMQP_CONTENT_TYPE";  
+    final static String DEFAULT_CHAR_SET_ENCODING = "UTF-8";
+    
+    enum ContentType 
+    {
+       TEXT_PLAIN ("text/plain"),
+       TEXT_XML ("text/xml"),
+       APPLICATION_OCTECT ("application/octet-stream");
+       
+       private final String _value;
+       
+       private ContentType (String value)
+       {
+               _value = value;
+       }
+       
+       public String value()
+       {
+               return _value;
+       }
+    }
+    
+    private static String createURL(String exchangeName,String routingKey)
+    {
+       StringBuffer buf = new StringBuffer();
+       buf.append("amqp://");
+       buf.append(exchangeName);
+       buf.append("?");
+       buf.append("routingKey=");
+       buf.append(routingKey);
+       
+       return buf.toString();
+    }
+    
+    public static MessageContext getSynapseMessageContext(AMQMessage amqMsg) 
throws SynapseException {
+
+        if (synCfg == null || synEnv == null) {
+            String msg = "Synapse environment has not initialized properly..";
+            log.fatal(msg);
+            throw new SynapseException(msg);
+        }
+        
+        org.apache.axis2.context.MessageContext axis2MC = new 
org.apache.axis2.context.MessageContext();        
+        Axis2MessageContext synCtx = new Axis2MessageContext(axis2MC, synCfg, 
synEnv);
+        
synCtx.setMessageID(amqMsg.getTransferBody().getMessageId().asString());
+        if(amqMsg.getTransferBody().getCorrelationId() != null)
+        {
+               synCtx.setRelatesTo(new RelatesTo[]{new 
RelatesTo(amqMsg.getTransferBody().getCorrelationId().asString())});
+        }
+        synCtx.setTo(new 
EndpointReference(createURL(amqMsg.getTransferBody().getExchange().asString(),amqMsg.getTransferBody().getRoutingKey().asString())));
+        
+        if(amqMsg.getTransferBody().getReplyTo() != null)
+        {
+               synCtx.setReplyTo(new 
EndpointReference(createURL(amqMsg.getTransferBody().getExchange().asString(),amqMsg.getTransferBody().getReplyTo().asString())));
       
+       }
+        synCtx.setDoingPOX(true);
+        synCtx.setProperty(ORIGINAL_MESSAGE, amqMsg);
+        
+        //Creating a fictitious SOAP envelope to support the synapse model
+        
+        SOAPFactory soapFactory = new SOAP11Factory();
+        SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
+        
+        String contentType = 
amqMsg.getTransferBody().getContentType().asString();
+        if(ContentType.TEXT_PLAIN.value().equals(contentType))
+        {
+               OMElement wrapper = soapFactory.createOMElement(new 
QName("payload"), null);
+               OMText textData = 
soapFactory.createOMText(amqMsg.getTransferBody().getBody().getContentAsString());
+            wrapper.addChild(textData);
+            envelope.getBody().addChild(wrapper);
+        }
+        else if (ContentType.TEXT_XML.value().equals(contentType))
+        {
+               XMLStreamReader parser;
+                       try
+                       {
+                               parser = StAXUtils.createXMLStreamReader(
+                                                new 
ByteArrayInputStream(amqMsg.getTransferBody().getBody().getContentAsByteArray()),
+                                                DEFAULT_CHAR_SET_ENCODING);
+                       }
+                       catch (XMLStreamException e)
+                       {
+                               throw new SynapseException("Error reading the 
XML message",e);                          
+                       }
+               
+               StAXOMBuilder builder = new StAXOMBuilder(parser);
+               //builder.setOMBuilderFactory(soapFactory);
+               
+               Object obj = builder.getDocumentElement();
+               envelope.getBody().addChild(builder.getDocumentElement());
+        }
+        else if (ContentType.APPLICATION_OCTECT.value().equals(contentType))
+        {
+               // treat binary data as an attachment
+               DataHandler dataHandler = new DataHandler(
+                    new 
ByteArrayDataSource(amqMsg.getTransferBody().getBody().getContentAsByteArray()));
+                OMText textData = soapFactory.createOMText(dataHandler, true);
+                OMElement wrapper = soapFactory.createOMElement(new 
QName("payload"), null);
+                wrapper.addChild(textData);
+                synCtx.setDoingMTOM(true);
+                
+                envelope.getBody().addChild(wrapper);
+        }
+        else
+        {
+               throw new SynapseException("Unsupported Content Type : " + 
contentType);
+        }
+        
+        synCtx.setProperty(AMQP_CONTENT_TYPE, contentType);
+                
+        try
+        {
+               synCtx.setEnvelope(envelope);
+        }
+        catch(AxisFault e)
+        {        
+               throw new SynapseException(e);
+        }
+               
+        return synCtx;
+    }    
+    
+    public static AMQMessage getAMQMessage(MessageContext mc)
+    {
+       AMQMessage origMsg = (AMQMessage)mc.getProperty(ORIGINAL_MESSAGE);
+       OMElement payload = mc.getEnvelope().getBody().getFirstElement();
+       
+       String amqContentType = (String)mc.getProperty(AMQP_CONTENT_TYPE);
+       byte[] content = new byte[0];
+       
+       if(ContentType.TEXT_PLAIN.value().equals(amqContentType))
+       {
+               // For plain text there was a wrapper element
+               content = payload.getText().getBytes();
+       }
+       else if (ContentType.TEXT_XML.value().equals(amqContentType))
+       {
+               content = payload.getText().getBytes();
+       }
+       else if (ContentType.APPLICATION_OCTECT.value().equals(amqContentType) 
&& mc.isDoingMTOM())
+       {
+               
+       }
+       
+       String url = mc.getTo().getAddress();;
+               // very crude
+               // should have utility class to do this, but do it when amqp
+               // officialy converge on an addressing scheme
+               String exchangeName = url.substring(7,url.indexOf('?'));
+               String routingKey = 
url.substring(url.indexOf('=')+1,url.length());
+               
+       
+       MessageTransferBody origTransferBody = origMsg.getTransferBody();
+       MessageTransferBody transferBody = MessageTransferBody.createMethodBody(
+                       origTransferBody.getMajor(), 
+                       origTransferBody.getMinor(),
+                       origTransferBody.getAppId(), //appId
+                       origTransferBody.getApplicationHeaders(), 
//applicationHeaders
+                               new Content(Content.TypeEnum.INLINE_T, 
content), //body
+                               origTransferBody.getContentType(), 
//contentEncoding, 
+                               origTransferBody.getContentType(), //contentType
+                               origTransferBody.getCorrelationId(), 
//correlationId
+                               origTransferBody.getDeliveryMode(), 
//deliveryMode non persistant
+                               new AMQShortString(exchangeName),// destination
+                               new AMQShortString(exchangeName),// exchange
+                               origTransferBody.getExpiration(), //expiration
+                               origTransferBody.getImmediate(), //immediate
+                               origTransferBody.getMandatory(), //mandatory
+                               origTransferBody.getMessageId(), //messageId
+                               origTransferBody.getPriority(), //priority
+                               origTransferBody.getRedelivered(), //redelivered
+                               origTransferBody.getReplyTo(), //replyTo
+                               new AMQShortString(routingKey), //routingKey, 
+                               "abc".getBytes(), //securityToken
+                               origTransferBody.ticket, //ticket
+                               System.currentTimeMillis(), //timestamp
+                               origTransferBody.getTransactionId(), 
//transactionId
+                               origTransferBody.getTtl(), //ttl, 
+                               origTransferBody.getUserId() //userId
+                               );
+       AMQMessage newMsg = new 
AMQMessage(origMsg.getMessageStore(),transferBody,origMsg.getTransactionContext());
+       
+       return newMsg;
+    }
+    
+    public static void setSynConfig(SynapseConfiguration synCfg) {
+        MessageContextCreatorForQpid.synCfg = synCfg;
+    }
+
+    public static void setSynEnv(SynapseEnvironment synEnv) {
+        MessageContextCreatorForQpid.synEnv = synEnv;
+    }
+}

Added: 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java?view=auto&rev=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java
 (added)
+++ 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/QpidSynapseEnvironment.java
 Mon Jul 23 17:35:26 2007
@@ -0,0 +1,74 @@
+package org.apache.qpid.server.exchange.synapse;
+
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.endpoints.utils.EndpointDefinition;
+import org.apache.synapse.statistics.StatisticsCollector;
+
+public class QpidSynapseEnvironment implements SynapseEnvironment
+{
+
+       private static final Log log = 
LogFactory.getLog(QpidSynapseEnvironment.class);
+
+       private SynapseConfiguration synapseConfig;
+
+       private StatisticsCollector statisticsCollector;
+
+       private SynapseExchange qpidExchange;
+       
+       public QpidSynapseEnvironment(SynapseConfiguration synapseConfig, 
SynapseExchange qpidExchange)
+       {
+               this.synapseConfig = synapseConfig;
+               this.qpidExchange = qpidExchange;
+       }
+
+       public MessageContext createMessageContext()
+       {
+               org.apache.axis2.context.MessageContext axis2MC = new 
org.apache.axis2.context.MessageContext();
+               MessageContext mc = new Axis2MessageContext(axis2MC, 
synapseConfig, this);
+               return mc;
+       }
+
+       public StatisticsCollector getStatisticsCollector()
+       {
+               return statisticsCollector;
+       }
+
+       public void injectMessage(MessageContext synCtx)
+       {
+
+               synCtx.getMainSequence().mediate(synCtx);
+       }
+
+       public void send(EndpointDefinition endpoint, MessageContext smc)
+       {
+               if(endpoint != null)
+               {
+                       smc.setTo(new EndpointReference(endpoint.getAddress()));
+                       AMQMessage newMessage = 
MessageContextCreatorForQpid.getAMQMessage(smc);
+                       try
+                       {       
+                               
qpidExchange.getExchangeRegistry().routeContent(newMessage);
+                       }
+                       catch(Exception e)
+                       {
+                               throw new SynapseException("Faulty endpoint",e);
+                       }
+               }       
+               
+       }
+
+       public void setStatisticsCollector(StatisticsCollector 
statisticsCollector)
+       {
+               this.statisticsCollector = statisticsCollector;
+       }
+
+}

Added: 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java?view=auto&rev=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java
 (added)
+++ 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/SynapseExchange.java
 Mon Jul 23 17:35:26 2007
@@ -0,0 +1,104 @@
+package org.apache.qpid.server.exchange.synapse;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.synapse.Constants;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.config.SynapseConfigurationBuilder;
+import org.apache.synapse.core.SynapseEnvironment;
+
+public class SynapseExchange extends AbstractExchange
+{
+
+       public final static AMQShortString TYPE = new 
AMQShortString("synapse"); 
+       
+       private SynapseEnvironment synEnv;
+       
+       private ExchangeRegistry exchangeRegistry;
+
+       public SynapseExchange()
+       {
+               super();
+       }
+
+       @Override
+       public void initialise(VirtualHost host, AMQShortString name, boolean 
durable, int ticket, boolean autoDelete, ExchangeRegistry exchangeRegistry) 
throws AMQException
+       {
+               super.initialise(host, name, durable, ticket, autoDelete, 
exchangeRegistry);
+               
+               String config = System.getProperty(Constants.SYNAPSE_XML);
+               SynapseConfiguration synapseConfiguration = 
SynapseConfigurationBuilder.getConfiguration(config);
+               synEnv = new QpidSynapseEnvironment(synapseConfiguration,this);
+               MessageContextCreatorForQpid.setSynConfig(synapseConfiguration);
+               MessageContextCreatorForQpid.setSynEnv(synEnv);
+               this.exchangeRegistry = exchangeRegistry;
+       }
+
+       @Override
+       protected ExchangeMBean createMBean() throws AMQException
+       {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+       public void deregisterQueue(AMQShortString routingKey, AMQQueue queue) 
throws AMQException
+       {
+               throw new UnsupportedOperationException("This exchange does not 
take bindings");
+       }
+
+       public AMQShortString getType()
+       {
+               return TYPE;
+       }
+
+       public boolean hasBindings() throws AMQException
+       {               
+               return false;
+       }
+
+       public boolean isBound(AMQShortString routingKey, AMQQueue queue) 
throws AMQException
+       {
+               throw new UnsupportedOperationException("This exchange does not 
take bindings");
+       }
+
+       public boolean isBound(AMQShortString routingKey) throws AMQException
+       {
+               throw new UnsupportedOperationException("This exchange does not 
take bindings");
+       }
+
+       public boolean isBound(AMQQueue queue) throws AMQException
+       {
+               throw new UnsupportedOperationException("This exchange does not 
take bindings");
+       }
+
+       public void registerQueue(AMQShortString routingKey, AMQQueue queue, 
FieldTable args) throws AMQException
+       {
+               throw new UnsupportedOperationException("This exchange does not 
take bindings");
+       }
+       
+       public void route(AMQMessage message) throws AMQException
+       {
+               try
+               {
+                       MessageContext mc = 
MessageContextCreatorForQpid.getSynapseMessageContext(message);
+                       synEnv.injectMessage(mc);                       
+               }
+               catch(Exception e)
+               {
+                       throw new AMQException("Error occurred while trying to 
mediate message through Synapse",e);
+               }
+       }
+       
+       public ExchangeRegistry getExchangeRegistry()
+       {
+               return exchangeRegistry;
+       }
+
+}

Added: 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java?view=auto&rev=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java
 (added)
+++ 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/exchange/synapse/TestClassMediator.java
 Mon Jul 23 17:35:26 2007
@@ -0,0 +1,62 @@
+package org.apache.qpid.server.exchange.synapse;
+
+import javax.activation.DataHandler;
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.attachments.ByteArrayDataSource;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMText;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.impl.llom.soap11.SOAP11Factory;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.MessageContext;
+
+public class TestClassMediator implements Mediator
+{
+
+       public int getTraceState()
+       {
+               // TODO Auto-generated method stub
+               return 0;
+       }
+
+       public String getType()
+       {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+       public boolean mediate(MessageContext mc)
+       {
+               SOAPFactory soapFactory = new SOAP11Factory();
+               OMElement binaryNode = 
mc.getEnvelope().getBody().getFirstChildWithName(new QName("payload"));
+               byte[] source = binaryNode.getText().getBytes();
+               
+               byte[] b = new byte[source.length];
+           int j = 0;
+               for(int i=source.length-1; i>0; i--)
+               {
+                       b[j] = source[i];
+                       j++;
+               }
+               
+               mc.getEnvelope().getBody().getFirstChildWithName(new 
QName("payload")).detach();
+               
+               DataHandler dataHandler = new DataHandler(
+                new ByteArrayDataSource(b));
+        OMText textData = soapFactory.createOMText(dataHandler, true);
+        OMElement wrapper = soapFactory.createOMElement(new QName("payload"), 
null);
+        wrapper.addChild(textData);
+        mc.setDoingMTOM(true);
+        
+        mc.getEnvelope().getBody().addChild(wrapper);
+               return true;
+       }
+
+       public void setTraceState(int arg0)
+       {
+               // TODO Auto-generated method stub
+
+       }
+
+}

Modified: 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
 Mon Jul 23 17:35:26 2007
@@ -79,7 +79,7 @@
                 {
                     try
                     {
-                        exchange = 
exchangeFactory.createExchange(body.exchange, body.type, body.durable,
+                        exchange = 
exchangeFactory.createExchange(exchangeRegistry,body.exchange, body.type, 
body.durable,
                                                                   
body.passive, body.ticket);
                         exchangeRegistry.registerExchange(exchange);
                     }

Modified: 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
 Mon Jul 23 17:35:26 2007
@@ -34,12 +34,15 @@
         define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, 
ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
         define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, 
ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
 
+        //There should be an extention mechanism to register
+        define(registry,factory,new AMQShortString("amq.synapse"),new 
AMQShortString("synapse"));
+        
         
registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME));
     }
 
     private void define(ExchangeRegistry r, ExchangeFactory f,
                         AMQShortString name, AMQShortString type) throws 
AMQException
     {
-        r.registerExchange(f.createExchange(name, type, true, false, 0));
+        r.registerExchange(f.createExchange(r,name, type, true, false, 0));
     }
 }

Modified: 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Mon Jul 23 17:35:26 2007
@@ -643,4 +643,13 @@
         return _requestId;
     }
 
+    public MessageStore getMessageStore()
+    {
+       return _store;
+    }
+    
+    public TransactionalContext getTransactionContext()
+    {
+       return _txnContext;
+    }
 }

Modified: 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/QpidTestClient.java
 Mon Jul 23 17:35:26 2007
@@ -27,30 +27,174 @@
                        QpidExchangeHelper exchangeHelper = 
session.getExchangeHelper();
                        exchangeHelper.declareExchange(false, false, 
QpidConstants.DIRECT_EXCHANGE_NAME, false, false, false, 
QpidConstants.DIRECT_EXCHANGE_CLASS);
                        
-                       QpidQueueHelper queueHelper = session.getQueueHelper();
-                       queueHelper.declareQueue(false, false, false, false, 
false, "myQueue");
-                       
queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, false, "myQueue", 
"RH");
+                       exchangeHelper.declareExchange(false, false, 
QpidConstants.SYNAPSE_EXCHANGE_NAME, false, false, false, 
QpidConstants.SYNAPSE_EXCHANGE_CLASS);
+                                                               
+                   //contentBasedRoutingSample(session);
                        
-                       MessageHeaders msgHeaders = new MessageHeaders();
-                       msgHeaders.setRoutingKey(new AMQShortString("RH"));
-                       msgHeaders.setExchange(new 
AMQShortString(QpidConstants.DIRECT_EXCHANGE_NAME));
-                       AMQPApplicationMessage msg = new 
AMQPApplicationMessage(msgHeaders,"test".getBytes());
+                       //transformationSample2(session);                       
                        
-                       QpidMessageProducer messageProducer = 
session.createProducer();
-                       messageProducer.open();
-                       messageProducer.send(false, true, msg);
-                       
-                       QpidMessageConsumer messageConsumer = 
session.createConsumer("myQueue", false, false);
-                       messageConsumer.open();
-                       
-                       AMQPApplicationMessage msg2 = messageConsumer.receive();
-                       System.out.println(msg.toString());
+                       binaryMessageTransformations(session);
                }
                catch(Exception e)
                {
                        e.printStackTrace();
                }
                
+       }
+       
+       public static void contentBasedRoutingSample(QpidSession session) 
throws Exception
+       {
+               String tmp = "<m:troubleTicket 
xmlns:m=\"http://redhat.com/sample\";><m:customerId>532535</m:customerId><m:priority>";
+               String tmp2 = 
"</m:priority><m:appId>ESB</m:appId><m:desc>blabla</m:desc></m:troubleTicket>";
+               
+               //Create queues
+               QpidQueueHelper queueHelper = session.getQueueHelper();
+               queueHelper.declareQueue(false, false, false, false, false, 
"criticalTicketQueue");
+               queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, 
false, "criticalTicketQueue", "criticalTicket");
+               
+               queueHelper.declareQueue(false, false, false, false, false, 
"lowPriorityTicketQueue");
+               queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, 
false, "lowPriorityTicketQueue", "lowPriorityTicket");
+               
+               queueHelper.declareQueue(false, false, false, false, false, 
"ticketQueue");
+               queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, 
false, "ticketQueue", "troubleTicket");
+               
+               QpidMessageProducer messageProducer = session.createProducer();
+               messageProducer.open();
+                               
+               MessageHeaders msgHeaders = new MessageHeaders();
+               msgHeaders.setContentType(new AMQShortString("text/xml"));
+               msgHeaders.setRoutingKey(new AMQShortString("defectSystem"));
+               msgHeaders.setDestination(new 
AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+               msgHeaders.setExchange(new 
AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+                               
+               StringBuffer buf = new StringBuffer();
+               buf.append(tmp).append("critical").append(tmp2);
+               AMQPApplicationMessage criticalMsg = new 
AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+               System.out.println(criticalMsg.toString());
+               messageProducer.send(false, true, criticalMsg);
+               
+               buf = new StringBuffer();
+               buf.append(tmp).append("low").append(tmp2);
+               AMQPApplicationMessage lowMsg = new 
AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+               System.out.println(lowMsg.toString());
+               messageProducer.send(false, true, lowMsg);
+               
+               buf = new StringBuffer();
+               buf.append(tmp).append("high").append(tmp2);
+               AMQPApplicationMessage highMsg = new 
AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+               System.out.println(highMsg.toString());
+               messageProducer.send(false, true, highMsg);
+               
+               QpidMessageConsumer messageConsumerCritical = 
session.createConsumer("criticalTicketQueue", false, false);
+               messageConsumerCritical.open();         
+               AMQPApplicationMessage criticalMsgRcv = 
messageConsumerCritical.receive();
+               System.out.println(criticalMsgRcv.toString());
+               
+               QpidMessageConsumer messageConsumerLow = 
session.createConsumer("lowPriorityTicketQueue", false, false);
+               messageConsumerLow.open();              
+               AMQPApplicationMessage lowMsgRcv = messageConsumerLow.receive();
+               System.out.println(lowMsgRcv.toString());
+               
+               QpidMessageConsumer messageConsumer = 
session.createConsumer("ticketQueue", false, false);
+               messageConsumer.open();         
+               AMQPApplicationMessage msgRcv = messageConsumer.receive();
+               System.out.println(msgRcv.toString());
+               
+       }
+       
+       public static void transformationSample(QpidSession session) throws 
Exception
+       {
+               String tmp = "<m:quote 
xmlns:m=\"http://redhat.com/sample\";><m:ticker>RHT</m:ticker><m:value>125</m:value></m:quote>";
+               
+               QpidQueueHelper queueHelper = session.getQueueHelper();
+               queueHelper.declareQueue(false, false, false, false, false, 
"stockQuoteQueue");
+               queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, 
false, "stockQuoteQueue", "stockQuote");
+               
+               MessageHeaders msgHeaders = new MessageHeaders();
+               msgHeaders.setContentType(new AMQShortString("text/xml"));
+               msgHeaders.setRoutingKey(new AMQShortString("stockQuote"));
+               msgHeaders.setDestination(new 
AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+               msgHeaders.setExchange(new 
AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+               
+               AMQPApplicationMessage msg = new 
AMQPApplicationMessage(msgHeaders,tmp.getBytes());
+               
+               QpidMessageProducer messageProducer = session.createProducer();
+               messageProducer.open();
+               System.out.println(msg.toString());
+               messageProducer.send(false, true, msg);
+       }
+       
+       public static void transformationSample2(QpidSession session) throws 
Exception
+       {
+               StringBuffer buf = new StringBuffer();
+                   buf.append("<m:vacationPackage 
xmlns:m=\"http://redhat.com/sample\";>"); 
+                       
buf.append("<m:customerFirstName>Rajith</m:customerFirstName>"); 
+                       
buf.append("<m:customerLastName>Rajith</m:customerLastName>"); 
+                       buf.append("<m:customerAddress>3349 Missississauga 
Road,Mississauga,ON, L5L 1J7</m:customerAddress>");   
+                       
buf.append("<m:customerDOB>Mississauga</m:customerDOB>");
+                       
buf.append("<m:paymentInfo>Visa,456454574575325325235,05122007</m:paymentInfo>");
+                       buf.append("<m:start>12072007</m:start>");
+                       buf.append("<m:end>18072007</m:end>");
+                       buf.append("<m:airTicket>");            
+                       buf.append("<m:airline>AC</m:airline>");
+                       buf.append("<m:seatPreference>W</m:seatPreference>");
+                       
buf.append("<m:frequentFlyer>643663345</m:frequentFlyer>");
+                       buf.append("</m:airTicket>"); 
+                       buf.append("<m:Hotel>");
+                       buf.append("<m:noOfDays>5</m:noOfDays>");
+                       buf.append("<m:rating>5</m:rating>");
+                       buf.append("<m:meals>AI</m:meals>");   
+                       buf.append("</m:Hotel>");
+                       buf.append("<m:carRental>");
+                       buf.append("<m:from>14062007</m:from>");
+                       buf.append("<m:to>16062007</m:to>");
+                       buf.append("</m:carRental>");
+                       buf.append("</m:vacationPackage>");
+               
+               QpidQueueHelper queueHelper = session.getQueueHelper();
+               queueHelper.declareQueue(false, false, false, false, false, 
"carRentalQueue");
+               queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, 
false, "carRentalQueue", "carRental");
+               
+               queueHelper.declareQueue(false, false, false, false, false, 
"hotelQueue");
+               queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, 
false, "hotelQueue", "hotel");
+               
+               queueHelper.declareQueue(false, false, false, false, false, 
"airlineQueue");
+               queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, 
false, "airlineQueue", "airline");
+               
+               MessageHeaders msgHeaders = new MessageHeaders();
+               msgHeaders.setContentType(new AMQShortString("text/xml"));
+               msgHeaders.setRoutingKey(new AMQShortString("vacationPackage"));
+               msgHeaders.setDestination(new 
AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+               msgHeaders.setExchange(new 
AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+               
+               AMQPApplicationMessage msg = new 
AMQPApplicationMessage(msgHeaders,buf.toString().getBytes());
+               
+               QpidMessageProducer messageProducer = session.createProducer();
+               messageProducer.open();
+               System.out.println(msg.toString());
+               messageProducer.send(false, true, msg);
+       }
+       
+       public static void binaryMessageTransformations(QpidSession session) 
throws Exception
+       {
+               QpidQueueHelper queueHelper = session.getQueueHelper();
+               queueHelper.declareQueue(false, false, false, false, false, 
"binaryQueue");
+               queueHelper.bindQueue(QpidConstants.DIRECT_EXCHANGE_NAME, 
false, "binaryQueue", "binary");
+               
+               MessageHeaders msgHeaders = new MessageHeaders();
+               msgHeaders.setContentType(new 
AMQShortString("application/octet-stream"));
+               msgHeaders.setRoutingKey(new AMQShortString("binary"));
+               msgHeaders.setDestination(new 
AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+               msgHeaders.setExchange(new 
AMQShortString(QpidConstants.SYNAPSE_EXCHANGE_NAME));
+        
+               byte[] buf = new byte[]{72,101,108,108,111};
+               
+               AMQPApplicationMessage msg = new 
AMQPApplicationMessage(msgHeaders,buf);
+               
+               QpidMessageProducer messageProducer = session.createProducer();
+               messageProducer.open();
+               System.out.println(msg.toString());
+               messageProducer.send(false, true, msg);         
        }
        
 }

Modified: 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/api/QpidConstants.java
 Mon Jul 23 17:35:26 2007
@@ -22,6 +22,10 @@
 
     public final static String FANOUT_EXCHANGE_CLASS = "fanout";
 
+    public final static String SYNAPSE_EXCHANGE_NAME = "amq.synapse";
+
+    public final static String SYNAPSE_EXCHANGE_CLASS = "synapse";
+    
 
     public final static String SYSTEM_MANAGEMENT_EXCHANGE_NAME = 
"qpid.sysmgmt";
 

Modified: 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
 Mon Jul 23 17:35:26 2007
@@ -8,56 +8,56 @@
 
 public class PhaseFactory
 {
-    /**
-     * This method will create the pipe and return a reference
-     * to the top of the pipeline.
-     * 
-     * The application can then use this (top most) phase and all
-     * calls will propogated down the pipe.
-     * 
-     * Simillar calls orginating at the bottom of the pipeline
-     * will be propogated to the top.
-     * 
-     * @param ctx
-     * @return
-     * @throws AMQPException
-     */
-    public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
-    {
-       String key = AMQPConstants.PHASE_PIPE + "." + AMQPConstants.PHASE;
-       Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>();
-       List<String> list = ClientConfiguration.get().getList(key);
-       int index = 0;
-       for(String s:list)
+       /**
+        * This method will create the pipe and return a reference
+        * to the top of the pipeline.
+        * 
+        * The application can then use this (top most) phase and all
+        * calls will propogated down the pipe.
+        * 
+        * Simillar calls orginating at the bottom of the pipeline
+        * will be propogated to the top.
+        * 
+        * @param ctx
+        * @return
+        * @throws AMQPException
+        */
+       public static Phase createPhasePipe(PhaseContext ctx) throws 
AMQPException
        {
-           try
-           {
-               Phase temp = (Phase)Class.forName(s).newInstance();
-               phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index 
+  ")." + AMQPConstants.INDEX),temp) ;
-           }
-           catch(Exception e)
-           {
-               throw new AMQPException("Error loading phase " + 
ClientConfiguration.get().getString(s),e);
-           }    
-           index++;
+               String key = AMQPConstants.PHASE_PIPE + "." + 
AMQPConstants.PHASE;
+               Map<Integer, Phase> phaseMap = new HashMap<Integer, Phase>();
+               List<String> list = ClientConfiguration.get().getList(key);
+               int index = 0;
+               for (String s : list)
+               {
+                       try
+                       {
+                               Phase temp = (Phase) 
Class.forName(s).newInstance();
+                               
phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + 
AMQPConstants.INDEX), temp);
+                       }
+                       catch (Exception e)
+                       {
+                               throw new AMQPException("Error loading phase " 
+ ClientConfiguration.get().getString(s), e);
+                       }
+                       index++;
+               }
+
+               Phase current = null;
+               Phase prev = null;
+               Phase next = null;
+               //Lets build the phase pipe.
+               for (int i = 0; i < phaseMap.size(); i++)
+               {
+                       current = phaseMap.get(i);
+                       if (i + 1 < phaseMap.size())
+                       {
+                               next = phaseMap.get(i + 1);
+                       }
+                       current.init(ctx, next, prev);
+                       prev = current;
+                       next = null;
+               }
+
+               return current;
        }
-       
-       Phase current = null;
-       Phase prev = null;
-       Phase next = null;
-       //Lets build the phase pipe.
-       for (int i=0; i<phaseMap.size();i++)
-       {
-          current = phaseMap.get(i);      
-          if (i+1 < phaseMap.size())
-          {
-              next = phaseMap.get(i+1);
-          }
-          current.init(ctx, next, prev);
-          prev = current;
-          next = null;
-       }
-       
-       return current;
-    }
 }

Modified: 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageConsumerImpl.java
 Mon Jul 23 17:35:26 2007
@@ -70,7 +70,14 @@
        public AMQPApplicationMessage receive()throws QpidException
        {
                checkClosed();
-               return _queue.poll();
+               try
+               {
+                       return _queue.take();
+               }
+               catch (InterruptedException e)
+               {
+               throw new QpidException("Error occurred while retrieving 
message",e);
+               }
        }
        
        public AMQPApplicationMessage receive(long timeout, TimeUnit tu)throws 
QpidException

Modified: 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java?view=diff&rev=558903&r1=558902&r2=558903
==============================================================================
--- 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
 (original)
+++ 
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/impl/QpidMessageProducerImpl.java
 Mon Jul 23 17:35:26 2007
@@ -82,8 +82,8 @@
                                msgHeaders.getContentType(), //contentType
                                msgHeaders.getCorrelationId(), //correlationId
                                msgHeaders.getDeliveryMode(), //deliveryMode 
non persistant
-                               new 
AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
-                               new 
AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
+                               msgHeaders.getDestination(),// destination
+                               msgHeaders.getExchange(),// exchange
                                msgHeaders.getExpiration(), //expiration
                                msgHeaders.isImmediate(), //immediate
                                msgHeaders.isMandatory(), //mandatory


Reply via email to