Author: gsim
Date: Fri Jan 26 10:45:39 2007
New Revision: 500303
URL: http://svn.apache.org/viewvc?view=rev&rev=500303
Log:
Updated broker for issues highlighted by python tests. (e.g. fanout exchange,
default exchange rules etc)
Added:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
(with props)
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
(with props)
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
Fri Jan 26 10:45:39 2007
@@ -25,6 +25,8 @@
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.exchange.ExchangeDefaults;
public class DefaultExchangeFactory implements ExchangeFactory
{
@@ -34,9 +36,10 @@
public DefaultExchangeFactory()
{
- _exchangeClassMap.put("direct",
org.apache.qpid.server.exchange.DestNameExchange.class);
- _exchangeClassMap.put("topic",
org.apache.qpid.server.exchange.DestWildExchange.class);
- _exchangeClassMap.put("headers",
org.apache.qpid.server.exchange.HeadersExchange.class);
+ _exchangeClassMap.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
org.apache.qpid.server.exchange.DestNameExchange.class);
+ _exchangeClassMap.put(ExchangeDefaults.TOPIC_EXCHANGE_CLASS,
org.apache.qpid.server.exchange.DestWildExchange.class);
+ _exchangeClassMap.put(ExchangeDefaults.HEADERS_EXCHANGE_CLASS,
org.apache.qpid.server.exchange.HeadersExchange.class);
+ _exchangeClassMap.put(ExchangeDefaults.FANOUT_EXCHANGE_CLASS,
org.apache.qpid.server.exchange.FanoutExchange.class);
}
public Exchange createExchange(String exchange, String type, boolean
durable, boolean autoDelete,
@@ -46,7 +49,7 @@
Class<? extends Exchange> exchClass = _exchangeClassMap.get(type);
if (exchClass == null)
{
- throw new AMQException(_logger, "Unknown exchange type: " + type);
+ throw new AMQUnknownExchangeType("Unknown exchange type: " + type);
}
try
{
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
Fri Jan 26 10:45:39 2007
@@ -37,6 +37,8 @@
*/
private ConcurrentMap<String, Exchange> _exchangeMap = new
ConcurrentHashMap<String, Exchange>();
+ private Exchange _defaultExchange;
+
public DefaultExchangeRegistry(ExchangeFactory exchangeFactory)
{
//create 'standard' exchanges:
@@ -52,9 +54,18 @@
public void registerExchange(Exchange exchange)
{
+ if(_defaultExchange == null)
+ {
+ setDefaultExchange(exchange);
+ }
_exchangeMap.put(exchange.getName(), exchange);
}
+ public void setDefaultExchange(Exchange exchange)
+ {
+ _defaultExchange = exchange;
+ }
+
public void unregisterExchange(String name, boolean inUse) throws
AMQException
{
// TODO: check inUse argument
@@ -71,7 +82,16 @@
public Exchange getExchange(String name)
{
- return _exchangeMap.get(name);
+
+ if(name == null || name.length() == 0)
+ {
+ return _defaultExchange;
+ }
+ else
+ {
+ return _exchangeMap.get(name);
+ }
+
}
/**
@@ -82,7 +102,7 @@
public void routeContent(AMQMessage payload) throws AMQException
{
final String exchange = payload.getTransferBody().destination;
- final Exchange exch = _exchangeMap.get(exchange);
+ final Exchange exch = getExchange(exchange);
// there is a small window of opportunity for the exchange to be
deleted in between
// the JmsPublish being received (where the exchange is validated) and
the final
// content body being received (which triggers this method)
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
Fri Jan 26 10:45:39 2007
@@ -29,6 +29,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.exchange.ExchangeDefaults;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -227,5 +228,10 @@
public boolean hasBindings() throws AMQException
{
return !_index.getBindingsMap().isEmpty();
+ }
+
+ public String getType()
+ {
+ return ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
Fri Jan 26 10:45:39 2007
@@ -29,6 +29,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.exchange.ExchangeDefaults;
import javax.management.JMException;
import javax.management.MBeanException;
@@ -240,5 +241,11 @@
_logger.error("Exception occured in creating the topic exchenge
mbean", ex);
throw new AMQException("Exception occured in creating the topic
exchenge mbean", ex);
}
+ }
+
+
+ public String getType()
+ {
+ return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
Fri Jan 26 10:45:39 2007
@@ -28,6 +28,7 @@
public interface Exchange
{
String getName();
+ String getType();
void initialise(String name, boolean durable, int ticket, boolean
autoDelete) throws AMQException;
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
Fri Jan 26 10:45:39 2007
@@ -37,4 +37,6 @@
void unregisterExchange(String name, boolean inUse) throws
ExchangeInUseException, AMQException;
Exchange getExchange(String name);
+
+ void setDefaultExchange(Exchange exchange);
}
Added:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?view=auto&rev=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
(added)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
Fri Jan 26 10:45:39 2007
@@ -0,0 +1,205 @@
+package org.apache.qpid.server.exchange;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+import javax.management.openmbean.*;
+import javax.management.JMException;
+import javax.management.MBeanException;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+public class FanoutExchange extends AbstractExchange
+{
+ private static final Logger _logger =
Logger.getLogger(FanoutExchange.class);
+
+ /**
+ * Maps from queue name to queue instances
+ */
+ private final CopyOnWriteArraySet<AMQQueue> _queues = new
CopyOnWriteArraySet<AMQQueue>();
+
+ /**
+ * MBean class implementing the management interfaces.
+ */
+ @MBeanDescription("Management Bean for Fanout Exchange")
+ private final class FanoutExchangeMBean extends ExchangeMBean
+ {
+ // open mbean data types for representing exchange bindings
+ private String[] _bindingItemNames = {"Routing Key", "Queue Names"};
+ private String[] _bindingItemIndexNames = {_bindingItemNames[0]};
+ private OpenType[] _bindingItemTypes = new OpenType[2];
+ private CompositeType _bindingDataType = null;
+ private TabularType _bindinglistDataType = null;
+ private TabularDataSupport _bindingList = null;
+
+ @MBeanConstructor("Creates an MBean for AMQ fanout exchange")
+ public FanoutExchangeMBean() throws JMException
+ {
+ super();
+ _exchangeType = "fanout";
+ init();
+ }
+
+ /**
+ * initialises the OpenType objects.
+ */
+ private void init() throws OpenDataException
+ {
+ _bindingItemTypes[0] = SimpleType.STRING;
+ _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
+ _bindingDataType = new CompositeType("Exchange Binding", "Routing
key and Queue names",
+ _bindingItemNames,
_bindingItemNames, _bindingItemTypes);
+ _bindinglistDataType = new TabularType("Exchange Bindings",
"Exchange Bindings for " + getName(),
+ _bindingDataType,
_bindingItemIndexNames);
+ }
+
+ public TabularData bindings() throws OpenDataException
+ {
+
+ _bindingList = new TabularDataSupport(_bindinglistDataType);
+
+ for (AMQQueue queue : _queues)
+ {
+ String queueName = queue.getName().toString();
+
+
+
+ Object[] bindingItemValues = {queueName, new String[]
{queueName}};
+ CompositeData bindingData = new
CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
+ _bindingList.put(bindingData);
+ }
+
+ return _bindingList;
+ }
+
+ public void createNewBinding(String queueName, String binding) throws
JMException
+ {
+ AMQQueue queue =
ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName);
+ if (queue == null)
+ {
+ throw new JMException("Queue \"" + queueName + "\" is not
registered with the exchange.");
+ }
+
+ try
+ {
+ registerQueue(binding, queue, null);
+ queue.bind(binding, FanoutExchange.this);
+ }
+ catch (AMQException ex)
+ {
+ throw new MBeanException(ex);
+ }
+ }
+
+ }// End of MBean class
+
+
+ protected ExchangeMBean createMBean() throws AMQException
+ {
+ try
+ {
+ return new FanoutExchange.FanoutExchangeMBean();
+ }
+ catch (JMException ex)
+ {
+ _logger.error("Exception occured in creating the direct exchange
mbean", ex);
+ throw new AMQException("Exception occured in creating the direct
exchange mbean", ex);
+ }
+ }
+
+ public String getType()
+ {
+ return ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
+ }
+
+ public void registerQueue(String routingKey, AMQQueue queue, FieldTable
args) throws AMQException
+ {
+ assert queue != null;
+
+ if (_queues.contains(queue))
+ {
+ _logger.debug("Queue " + queue + " is already registered");
+ }
+ else
+ {
+ _queues.add(queue);
+ _logger.debug("Binding queue " + queue + " with routing key " +
routingKey + " to exchange " + this);
+ }
+ }
+
+ public void deregisterQueue(String routingKey, AMQQueue queue) throws
AMQException
+ {
+ assert queue != null;
+ assert routingKey != null;
+
+ if (!_queues.remove(queue))
+ {
+ throw new AMQException("Queue " + queue + " was not registered
with exchange " + this.getName() +
+ ". ");
+ }
+ }
+
+ public void route(AMQMessage payload) throws AMQException
+ {
+ MessageTransferBody transferBody = payload.getTransferBody();
+ if (_queues == null || _queues.isEmpty())
+ {
+ String msg = "No queues bound to " + this;
+ // XXX
+ /*if (publishBody.mandatory)
+ {
+ throw new NoRouteException(msg, payload);
+ }
+ else*/
+ {
+ _logger.warn(msg);
+ }
+ }
+ else
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Publishing message to queue " + _queues);
+ }
+
+ for (AMQQueue q : _queues)
+ {
+ q.deliver(payload);
+ }
+ }
+ }
+
+ public boolean isBound(String routingKey, AMQQueue queue) throws
AMQException
+ {
+ return _queues.contains(queue);
+ }
+
+ public boolean isBound(String routingKey) throws AMQException
+ {
+
+ return _queues != null && !_queues.isEmpty();
+ }
+
+ public boolean isBound(AMQQueue queue) throws AMQException
+ {
+
+
+ return _queues.contains(queue);
+ }
+
+ public boolean hasBindings() throws AMQException
+ {
+ return !_queues.isEmpty();
+ }
+}
Propchange:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
Fri Jan 26 10:45:39 2007
@@ -28,6 +28,7 @@
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.exchange.ExchangeDefaults;
import javax.management.JMException;
import javax.management.openmbean.*;
@@ -284,5 +285,10 @@
{
return o instanceof Registration && ((Registration)
o).queue.equals(queue);
}
+ }
+
+ public String getType()
+ {
+ return ExchangeDefaults.HEADERS_EXCHANGE_CLASS;
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
Fri Jan 26 10:45:39 2007
@@ -22,9 +22,12 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -68,9 +71,28 @@
if (exchange == null)
{
- exchange = exchangeFactory.createExchange(body.exchange,
body.type, body.durable,
- body.passive,
body.ticket);
- exchangeRegistry.registerExchange(exchange);
+ if(body.passive && ((body.type == null) || body.type.length()
==0))
+ {
+ throw
body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Unknown exchange: "
+ body.exchange);
+ }
+ else
+ {
+ try
+ {
+ exchange =
exchangeFactory.createExchange(body.exchange, body.type, body.durable,
+
body.passive, body.ticket);
+ exchangeRegistry.registerExchange(exchange);
+ }
+ catch(AMQUnknownExchangeType e)
+ {
+ throw
body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown
exchange type: " + body.type,e);
+ }
+ }
+ }
+ else if (!exchange.getType().equals(body.type))
+ {
+
+ throw new
AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare
exchange: " + body.exchange + " of type " + exchange.getType() + " to " +
body.type +".",body.getClazz(),
body.getMethod(),body.getMajor(),body.getMinor());
}
}
if(!body.nowait)
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
Fri Jan 26 10:45:39 2007
@@ -31,6 +31,9 @@
define(registry, factory, ExchangeDefaults.DIRECT_EXCHANGE_NAME,
ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME,
ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME,
ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME,
ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
+
+
registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME));
}
private void define(ExchangeRegistry r, ExchangeFactory f,
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
Fri Jan 26 10:45:39 2007
@@ -62,7 +62,7 @@
public int hashCode()
{
- return exchange.hashCode() + routingKey.hashCode();
+ return (exchange == null ? 0 : exchange.hashCode()) + (routingKey
== null ? 0 : routingKey.hashCode());
}
public boolean equals(Object o)
Added:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java?view=auto&rev=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
(added)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
Fri Jan 26 10:45:39 2007
@@ -0,0 +1,9 @@
+package org.apache.qpid;
+
+public class AMQUnknownExchangeType extends AMQException
+{
+ public AMQUnknownExchangeType(String message)
+ {
+ super(message);
+ }
+}
Propchange:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/AMQUnknownExchangeType.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
Fri Jan 26 10:45:39 2007
@@ -33,4 +33,8 @@
public final static String HEADERS_EXCHANGE_NAME = "amq.match";
public final static String HEADERS_EXCHANGE_CLASS = "headers";
+
+ public final static String FANOUT_EXCHANGE_NAME = "amq.fanout";
+
+ public final static String FANOUT_EXCHANGE_CLASS = "fanout";
}
Modified:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQType.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
Fri Jan 26 10:45:39 2007
@@ -65,27 +65,46 @@
public int getEncodingSize(Object value)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ return EncodingUtils.unsignedIntegerLength();
}
-
- public Object toNativeValue(Object value)
+ public Long toNativeValue(Object value)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ if (value instanceof Long)
+ {
+ return (Long) value;
+ }
+ else if (value instanceof Integer)
+ {
+ return ((Integer) value).longValue();
+ }
+ else if (value instanceof Short)
+ {
+ return ((Short) value).longValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).longValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Long.valueOf((String)value);
+ }
+ else
+ {
+ throw new NumberFormatException("Cannot convert: " + value +
"(" +
+ value.getClass().getName() +
") to int.");
+ }
}
public void writeValueImpl(Object value, ByteBuffer buffer)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
}
public Object readValueFromBuffer(ByteBuffer buffer)
{
- // TODO : fixme
- throw new UnsupportedOperationException();
+ return EncodingUtils.readUnsignedInteger(buffer);
}
},
Modified:
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?view=diff&rev=500303&r1=500302&r2=500303
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
Fri Jan 26 10:45:39 2007
@@ -199,7 +199,7 @@
}
- public static long unsignedIntegerLength()
+ public static int unsignedIntegerLength()
{
return 4;
}
@@ -627,4 +627,16 @@
writeByte(buffer, (byte) character);
}
+ public static long readUnsignedInteger(ByteBuffer buffer)
+ {
+ long l = 0xFF & buffer.get();
+ l <<=8;
+ l = l | (0xFF & buffer.get());
+ l <<=8;
+ l = l | (0xFF & buffer.get());
+ l <<=8;
+ l = l | (0xFF & buffer.get());
+
+ return l;
+ }
}