Author: gsim
Date: Fri Jan 26 10:45:39 2007
New Revision: 500303

URL: http://svn.apache.org/viewvc?view=rev&rev=500303
Log:
Updated broker for issues highlighted by python tests. (e.g. fanout exchange, 
default exchange rules etc)


Added:
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
   (with props)
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
   (with props)
Modified:
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
 Fri Jan 26 10:45:39 2007
@@ -25,6 +25,8 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.exchange.ExchangeDefaults;
 
 public class DefaultExchangeFactory implements ExchangeFactory
 {
@@ -34,9 +36,10 @@
 
     public DefaultExchangeFactory()
     {
-        _exchangeClassMap.put("direct", 
org.apache.qpid.server.exchange.DestNameExchange.class);
-        _exchangeClassMap.put("topic", 
org.apache.qpid.server.exchange.DestWildExchange.class);
-        _exchangeClassMap.put("headers", 
org.apache.qpid.server.exchange.HeadersExchange.class);
+        _exchangeClassMap.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, 
org.apache.qpid.server.exchange.DestNameExchange.class);
+        _exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, 
org.apache.qpid.server.exchange.DestWildExchange.class);
+        _exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, 
org.apache.qpid.server.exchange.HeadersExchange.class);
+        _exchangeClassMap.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, 
org.apache.qpid.server.exchange.FanoutExchange.class);
     }
 
     public Exchange createExchange(String exchange, String type, boolean 
durable, boolean autoDelete,
@@ -46,7 +49,7 @@
         Class<? extends Exchange> exchClass = _exchangeClassMap.get(type);
         if (exchClass == null)
         {
-            throw new AMQException(_logger, "Unknown exchange type: " + type);
+            throw new AMQUnknownExchangeType("Unknown exchange type: " + type);
         }
         try
         {

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
 Fri Jan 26 10:45:39 2007
@@ -37,6 +37,8 @@
      */
     private ConcurrentMap<String, Exchange> _exchangeMap = new 
ConcurrentHashMap<String, Exchange>();
 
+    private Exchange _defaultExchange;
+
     public DefaultExchangeRegistry(ExchangeFactory exchangeFactory)
     {
         //create 'standard' exchanges:
@@ -52,9 +54,18 @@
 
     public void registerExchange(Exchange exchange)
     {
+        if(_defaultExchange == null)
+        {
+            setDefaultExchange(exchange);
+        }   
         _exchangeMap.put(exchange.getName(), exchange);
     }
 
+    public void setDefaultExchange(Exchange exchange)
+    {
+        _defaultExchange = exchange;
+    }
+
     public void unregisterExchange(String name, boolean inUse) throws 
AMQException
     {
         // TODO: check inUse argument
@@ -71,7 +82,16 @@
 
     public Exchange getExchange(String name)
     {
-        return _exchangeMap.get(name);
+
+        if(name == null || name.length() == 0)
+        {
+            return _defaultExchange;
+        }
+        else
+        {
+            return _exchangeMap.get(name);
+        }
+
     }
 
     /**
@@ -82,7 +102,7 @@
     public void routeContent(AMQMessage payload) throws AMQException
     {
         final String exchange = payload.getTransferBody().destination;
-        final Exchange exch = _exchangeMap.get(exchange);
+        final Exchange exch = getExchange(exchange);
         // there is a small window of opportunity for the exchange to be 
deleted in between
         // the JmsPublish being received (where the exchange is validated) and 
the final
         // content body being received (which triggers this method)

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
 Fri Jan 26 10:45:39 2007
@@ -29,6 +29,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.exchange.ExchangeDefaults;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -227,5 +228,10 @@
     public boolean hasBindings() throws AMQException
     {
         return !_index.getBindingsMap().isEmpty();
+    }
+
+    public String getType()
+    {
+        return ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
     }
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
 Fri Jan 26 10:45:39 2007
@@ -29,6 +29,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.exchange.ExchangeDefaults;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -240,5 +241,11 @@
             _logger.error("Exception occured in creating the topic exchenge 
mbean", ex);
             throw new AMQException("Exception occured in creating the topic 
exchenge mbean", ex);
         }
+    }
+
+
+    public String getType()
+    {
+        return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
     }
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
 Fri Jan 26 10:45:39 2007
@@ -28,6 +28,7 @@
 public interface Exchange
 {
     String getName();
+    String getType();
 
     void initialise(String name, boolean durable, int ticket, boolean 
autoDelete) throws AMQException;
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
 Fri Jan 26 10:45:39 2007
@@ -37,4 +37,6 @@
     void unregisterExchange(String name, boolean inUse) throws 
ExchangeInUseException, AMQException;
 
     Exchange getExchange(String name);
+
+    void setDefaultExchange(Exchange exchange);
 }

Added: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?view=auto&rev=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
 (added)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
 Fri Jan 26 10:45:39 2007
@@ -0,0 +1,205 @@
+package org.apache.qpid.server.exchange;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+import javax.management.openmbean.*;
+import javax.management.JMException;
+import javax.management.MBeanException;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+public class FanoutExchange extends AbstractExchange
+{
+    private static final Logger _logger = 
Logger.getLogger(FanoutExchange.class);
+
+    /**
+     * Maps from queue name to queue instances
+     */
+    private final CopyOnWriteArraySet<AMQQueue> _queues = new 
CopyOnWriteArraySet<AMQQueue>();
+
+    /**
+     * MBean class implementing the management interfaces.
+     */
+    @MBeanDescription("Management Bean for Fanout Exchange")
+    private final class FanoutExchangeMBean extends ExchangeMBean
+    {
+        // open mbean data types for representing exchange bindings
+        private String[]   _bindingItemNames = {"Routing Key", "Queue Names"};
+        private String[]   _bindingItemIndexNames = {_bindingItemNames[0]};
+        private OpenType[] _bindingItemTypes = new OpenType[2];
+        private CompositeType _bindingDataType = null;
+        private TabularType _bindinglistDataType = null;
+        private TabularDataSupport _bindingList = null;
+
+        @MBeanConstructor("Creates an MBean for AMQ fanout exchange")
+        public FanoutExchangeMBean()  throws JMException
+        {
+            super();
+            _exchangeType = "fanout";
+            init();
+        }
+
+        /**
+         * initialises the OpenType objects.
+         */
+        private void init() throws OpenDataException
+        {
+            _bindingItemTypes[0] = SimpleType.STRING;
+            _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
+            _bindingDataType = new CompositeType("Exchange Binding", "Routing 
key and Queue names",
+                                                 _bindingItemNames, 
_bindingItemNames, _bindingItemTypes);
+            _bindinglistDataType = new TabularType("Exchange Bindings", 
"Exchange Bindings for " + getName(),
+                                                 _bindingDataType, 
_bindingItemIndexNames);
+        }
+
+        public TabularData bindings() throws OpenDataException
+        {
+
+            _bindingList = new TabularDataSupport(_bindinglistDataType);
+
+            for (AMQQueue queue : _queues)
+            {
+                String queueName = queue.getName().toString();
+
+
+
+                Object[] bindingItemValues = {queueName, new String[] 
{queueName}};
+                CompositeData bindingData = new 
CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
+                _bindingList.put(bindingData);
+            }
+
+            return _bindingList;
+        }
+
+        public void createNewBinding(String queueName, String binding) throws 
JMException
+        {
+            AMQQueue queue = 
ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName);
+            if (queue == null)
+            {
+                throw new JMException("Queue \"" + queueName + "\" is not 
registered with the exchange.");
+            }
+
+            try
+            {
+                registerQueue(binding, queue, null);
+                queue.bind(binding, FanoutExchange.this);
+            }
+            catch (AMQException ex)
+            {
+                throw new MBeanException(ex);
+            }
+        }
+
+    }// End of MBean class
+
+
+    protected ExchangeMBean createMBean() throws AMQException
+    {
+        try
+        {
+            return new FanoutExchange.FanoutExchangeMBean();
+        }
+        catch (JMException ex)
+        {
+            _logger.error("Exception occured in creating the direct exchange 
mbean", ex);
+            throw new AMQException("Exception occured in creating the direct 
exchange mbean", ex);
+        }
+    }
+
+    public String getType()
+    {
+        return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
+    }
+
+    public void registerQueue(String routingKey, AMQQueue queue, FieldTable 
args) throws AMQException
+    {
+        assert queue != null;
+
+        if (_queues.contains(queue))
+        {
+            _logger.debug("Queue " + queue + " is already registered");
+        }
+        else
+        {
+            _queues.add(queue);
+            _logger.debug("Binding queue " + queue + " with routing key " + 
routingKey + " to exchange " + this);
+        }
+    }
+
+    public void deregisterQueue(String routingKey, AMQQueue queue) throws 
AMQException
+    {
+        assert queue != null;
+        assert routingKey != null;
+
+        if (!_queues.remove(queue))
+        {
+            throw new AMQException("Queue " + queue + " was not registered 
with exchange " + this.getName() +
+                                   ". ");
+        }
+    }
+
+    public void route(AMQMessage payload) throws AMQException
+    {
+        MessageTransferBody transferBody = payload.getTransferBody();
+        if (_queues == null || _queues.isEmpty())
+        {
+            String msg = "No queues bound to " + this;
+            // XXX
+            /*if (publishBody.mandatory)
+            {
+                throw new NoRouteException(msg, payload);
+            }
+            else*/
+            {
+                _logger.warn(msg);
+            }
+        }
+        else
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Publishing message to queue " + _queues);
+            }
+
+            for (AMQQueue q : _queues)
+            {
+                q.deliver(payload);
+            }
+        }
+    }
+
+    public boolean isBound(String routingKey, AMQQueue queue) throws 
AMQException
+    {
+        return _queues.contains(queue);
+    }
+
+    public boolean isBound(String routingKey) throws AMQException
+    {
+
+        return _queues != null && !_queues.isEmpty();
+    }
+
+    public boolean isBound(AMQQueue queue) throws AMQException
+    {
+
+
+        return _queues.contains(queue);
+    }
+
+    public boolean hasBindings() throws AMQException
+    {
+        return !_queues.isEmpty();
+    }
+}

Propchange: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
 Fri Jan 26 10:45:39 2007
@@ -28,6 +28,7 @@
 import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.exchange.ExchangeDefaults;
 
 import javax.management.JMException;
 import javax.management.openmbean.*;
@@ -284,5 +285,10 @@
         {
             return o instanceof Registration && ((Registration) 
o).queue.equals(queue);
         }
+    }
+
+    public String getType()
+    {
+        return ExchangeDefaults.HEADERS_EXCHANGE_CLASS;
     }
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
 Fri Jan 26 10:45:39 2007
@@ -22,9 +22,12 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnknownExchangeType;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.ExchangeDeclareBody;
 import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -68,9 +71,28 @@
 
             if (exchange == null)
             {
-                exchange = exchangeFactory.createExchange(body.exchange, 
body.type, body.durable,
-                                                          body.passive, 
body.ticket);
-                exchangeRegistry.registerExchange(exchange);
+                if(body.passive && ((body.type == null) || body.type.length() 
==0))
+                {
+                    throw 
body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: " 
+ body.exchange);                    
+                }
+                else
+                {
+                    try
+                    {
+                        exchange = 
exchangeFactory.createExchange(body.exchange, body.type, body.durable,
+                                                                  
body.passive, body.ticket);
+                        exchangeRegistry.registerExchange(exchange);
+                    }
+                    catch(AMQUnknownExchangeType e)
+                    {
+                        throw 
body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown 
exchange type: " + body.type,e);
+                    }
+                }
+            }
+            else if (!exchange.getType().equals(body.type))
+            {
+
+                throw new 
AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare 
exchange: " + body.exchange + " of type " + exchange.getType() + " to " + 
body.type +".",body.getClazz(), 
body.getMethod(),body.getMajor(),body.getMinor());    
             }
         }
         if(!body.nowait)

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
 Fri Jan 26 10:45:39 2007
@@ -31,6 +31,9 @@
         define(registry, factory, ExchangeDefaults.DIRECT_EXCHANGE_NAME, 
ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
         define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME, 
ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
         define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, 
ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+        define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, 
ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
+
+        
registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME));
     }
 
     private void define(ExchangeRegistry r, ExchangeFactory f,

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
 Fri Jan 26 10:45:39 2007
@@ -62,7 +62,7 @@
 
         public int hashCode()
         {
-            return exchange.hashCode() + routingKey.hashCode();
+            return (exchange == null ? 0 : exchange.hashCode()) + (routingKey 
== null ? 0 : routingKey.hashCode());
         }
 
         public boolean equals(Object o)

Added: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java?view=auto&rev=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
 (added)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
 Fri Jan 26 10:45:39 2007
@@ -0,0 +1,9 @@
+package org.apache.qpid;
+
+public class AMQUnknownExchangeType extends AMQException
+{
+    public AMQUnknownExchangeType(String message)
+    {
+        super(message);
+    }
+}

Propchange: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
 Fri Jan 26 10:45:39 2007
@@ -33,4 +33,8 @@
     public final static String HEADERS_EXCHANGE_NAME = "amq.match";
 
     public final static String HEADERS_EXCHANGE_CLASS = "headers";
+
+    public final static String FANOUT_EXCHANGE_NAME = "amq.fanout";
+
+    public final static String FANOUT_EXCHANGE_CLASS = "fanout";
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQType.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
 Fri Jan 26 10:45:39 2007
@@ -65,27 +65,46 @@
 
         public int getEncodingSize(Object value)
         {
-            // TODO : fixme
-            throw new UnsupportedOperationException();
+            return EncodingUtils.unsignedIntegerLength();
         }
 
-
-        public Object toNativeValue(Object value)
+        public Long toNativeValue(Object value)
         {
-            // TODO : fixme
-            throw new UnsupportedOperationException();
+            if (value instanceof Long)
+            {
+                return (Long) value;
+            }
+            else if (value instanceof Integer)
+            {
+                return ((Integer) value).longValue();
+            }
+            else if (value instanceof Short)
+            {
+                return ((Short) value).longValue();
+            }
+            else if (value instanceof Byte)
+            {
+                return ((Byte) value).longValue();
+            }
+            else if ((value instanceof String) || (value == null))
+            {
+                return Long.valueOf((String)value);
+            }
+            else
+            {
+                throw new NumberFormatException("Cannot convert: " + value + 
"(" +
+                                                value.getClass().getName() + 
") to int.");
+            }
         }
 
         public void writeValueImpl(Object value, ByteBuffer buffer)
         {
-            // TODO : fixme
-            throw new UnsupportedOperationException();
+            EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
         }
 
         public Object readValueFromBuffer(ByteBuffer buffer)
         {
-            // TODO : fixme
-            throw new UnsupportedOperationException();
+            return EncodingUtils.readUnsignedInteger(buffer);
         }
     },
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
 Fri Jan 26 10:45:39 2007
@@ -199,7 +199,7 @@
     }
 
 
-    public static long unsignedIntegerLength()
+    public static int unsignedIntegerLength()
     {
         return 4;
     }
@@ -627,4 +627,16 @@
         writeByte(buffer, (byte) character);
     }
 
+    public static long readUnsignedInteger(ByteBuffer buffer)
+    {
+        long l = 0xFF & buffer.get();
+        l <<=8;
+        l = l | (0xFF & buffer.get());
+        l <<=8;
+        l = l | (0xFF & buffer.get());
+        l <<=8;
+        l = l | (0xFF & buffer.get());
+
+        return l;
+    }
 }


Reply via email to