Author: rhs
Date: Wed Jan 17 17:28:44 2007
New Revision: 497278

URL: http://svn.apache.org/viewvc?view=rev&rev=497278
Log:
filled out consume and transfer handlers

Modified:
    incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl
    incubator/qpid/branches/qpid.0-9/java/broker/etc/log4j.xml
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java

Modified: 
incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl?view=diff&rev=497278&r1=497277&r2=497278
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl 
(original)
+++ incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl 
Wed Jan 17 17:28:44 2007
@@ -60,7 +60,7 @@
     // Field methods           
 %{FLIST}    ${mb_field_get_method}
 
-    protected int getBodySize()
+    public int getBodySize()
     {      
         int size = 0;
 %{FLIST}    ${mb_field_size}

Modified: incubator/qpid/branches/qpid.0-9/java/broker/etc/log4j.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/etc/log4j.xml?view=diff&rev=497278&r1=497277&r2=497278
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/etc/log4j.xml (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/etc/log4j.xml Wed Jan 17 
17:28:44 2007
@@ -41,8 +41,12 @@
         <priority value="debug"/>
     </category>
 
+    <category name="org.apache.qpid.server.handler">
+        <priority value="debug"/>
+    </category>
+
     <root>
-        <priority value="info"/>
+        <priority value="debug"/>
         <appender-ref ref="STDOUT"/>
         <appender-ref ref="FileAppender"/>
     </root>

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=497278&r1=497277&r2=497278
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Wed Jan 17 17:28:44 2007
@@ -20,15 +20,20 @@
  */
 package org.apache.qpid.server;
 
+import org.apache.qpid.framing.Content;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageTransferBody;
 import org.apache.qpid.framing.RequestManager;
 import org.apache.qpid.framing.ResponseManager;
-import org.apache.qpid.protocol.AMQProtocolWriter;
 import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQProtocolWriter;
 import org.apache.qpid.server.ack.TxAck;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -66,7 +71,7 @@
     private long _prefetch_HighWaterMark;
 
     private long _prefetch_LowWaterMark;
-    
+
     private RequestManager _requestManager;
     private ResponseManager _responseManager;
 
@@ -89,11 +94,16 @@
     private int _consumerTag;
 
     /**
-     * The current message - which may be partial in the sense that not all 
frames have been received yet -
-     * which has been received by this channel. As the frames are received the 
message gets updated and once all
+     * The set of current messages - which may be partial in the sense that 
not all frames have been received yet -
+     * which has been received by this channel. As the frames are received the 
references get updated and once all
      * frames have been received the message can then be routed.
      */
-    private AMQMessage _currentMessage;
+    private Map<String, List<AMQMessage>> _messages = new LinkedHashMap();
+
+    /**
+     * The set of open references on this channel.
+     */
+    private Map<String, List<MessageAppendBody>> _references = new 
LinkedHashMap();
 
     /**
      * Maps from consumer tag to queue instance. Allows us to unsubscribe from 
a queue.
@@ -177,55 +187,80 @@
         _prefetch_HighWaterMark = prefetchCount;
     }
 
-//     public void setPublishFrame(BasicPublishBody publishBody, 
AMQProtocolSession publisher) throws AMQException
-//     {
-//         _currentMessage = new AMQMessage(_messageStore, publishBody);
-//         _currentMessage.setPublisher(publisher);
-//     }
-
-//     public void publishContentHeader(ContentHeaderBody contentHeaderBody)
-//             throws AMQException
-//     {
-//         if (_currentMessage == null)
-//         {
-//             throw new AMQException("Received content header without 
previously receiving a BasicDeliver frame");
-//         }
-//         else
-//         {
-//             _currentMessage.setContentHeaderBody(contentHeaderBody);
-//             // check and route if header says body length is zero
-//             if (contentHeaderBody.bodySize == 0)
-//             {
-//                 routeCurrentMessage();
-//             }
-//         }
-//     }
-// 
-//     public void publishContentBody(ContentBody contentBody)
-//             throws AMQException
-//     {
-//         if (_currentMessage == null)
-//         {
-//             throw new AMQException("Received content body without 
previously receiving a JmsPublishBody");
-//         }
-//         if (_currentMessage.getContentHeaderBody() == null)
-//         {
-//             throw new AMQException("Received content body without 
previously receiving a content header");
-//         }
-// 
-//         _currentMessage.addContentBodyFrame(contentBody);
-//         if (_currentMessage.isAllContentReceived())
-//         {
-//             routeCurrentMessage();
-//         }
-//     }
+    public void addMessageTransfer(MessageTransferBody transferBody, 
AMQProtocolSession publisher) throws AMQException
+    {
+        AMQMessage message = new AMQMessage(_messageStore, transferBody);
+        message.setPublisher(publisher);
+        Content body = transferBody.getBody();
+        switch (body.getContentType()) {
+        case CONTENT_TYPE_INLINE:
+            route(message);
+            break;
+        case CONTENT_TYPE_REFERENCE:
+            getMessages(body.getContent()).add(message);
+            break;
+        }
+    }
+
+    private List<AMQMessage> getMessages(byte[] reference) {
+        String key = new String(reference);
+        List<AMQMessage> result = _messages.get(key);
+        if (result == null) {
+            throw new IllegalArgumentException(key);
+        }
+        return result;
+    }
+
+    private List<MessageAppendBody> getReference(byte[] reference) {
+        String key = new String(reference);
+        List<MessageAppendBody> result = _references.get(key);
+        if (result == null) {
+            throw new IllegalArgumentException(key);
+        }
+        return result;
+    }
+
+    private void createReference(byte[] reference) {
+        String key = new String(reference);
+        if (_references.containsKey(key)) {
+            throw new IllegalArgumentException(key);
+        } else {
+            _references.put(key, new LinkedList());
+            _messages.put(key, new LinkedList());
+        }
+    }
+
+    private void clearReference(byte[] reference) {
+        String key = new String(reference);
+        _references.remove(key);
+        _messages.remove(key);
+    }
+
+    public void addMessageOpen(MessageOpenBody open) {
+        createReference(open.reference);
+    }
+
+    public void addMessageAppend(MessageAppendBody append) {
+        getReference(append.reference).add(append);
+    }
+
+    public void addMessageClose(MessageCloseBody close) throws AMQException {
+        List<AMQMessage> messages = getMessages(close.reference);
+        try {
+            for (AMQMessage msg : messages) {
+                route(msg);
+            }
+        } finally {
+            clearReference(close.reference);
+        }
+    }
 
-    protected void routeCurrentMessage() throws AMQException
+    protected void route(AMQMessage msg) throws AMQException
     {
         if (_transactional)
         {
             //don't create a transaction unless needed
-            if (_currentMessage.isPersistent())
+            if (msg.isPersistent())
             {
                 _txnBuffer.containsPersistentChanges();
             }
@@ -236,13 +271,13 @@
             //be added for every queue onto which the message is
             //enqueued. Finally a cleanup op will be added to decrement
             //the reference associated with the routing.
-            Store storeOp = new Store(_currentMessage);
+            Store storeOp = new Store(msg);
             _txnBuffer.enlist(storeOp);
-            _currentMessage.setTxnBuffer(_txnBuffer);
+            msg.setTxnBuffer(_txnBuffer);
             try
             {
-                _exchanges.routeContent(_currentMessage);
-                _txnBuffer.enlist(new Cleanup(_currentMessage));
+                _exchanges.routeContent(msg);
+                _txnBuffer.enlist(new Cleanup(msg));
             }
             catch (RequiredDeliveryException e)
             {
@@ -255,33 +290,28 @@
                 _txnBuffer.cancel(storeOp);
                 throw e;
             }
-            finally
-            {
-                _currentMessage = null;
-            }
         }
         else
         {
             try
             {
-                _exchanges.routeContent(_currentMessage);
+                _exchanges.routeContent(msg);
                 //following check implements the functionality
                 //required by the 'immediate' flag:
-                _currentMessage.checkDeliveredToConsumer();
+                msg.checkDeliveredToConsumer();
             }
             finally
             {
-                _currentMessage.decrementReference();
-                _currentMessage = null;
+                msg.decrementReference();
             }
         }
     }
-    
+
     public RequestManager getRequestManager()
     {
         return _requestManager;
     }
-    
+
     public ResponseManager getResponseManager()
     {
         return _responseManager;

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java?view=diff&rev=497278&r1=497277&r2=497278
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
 Wed Jan 17 17:28:44 2007
@@ -20,17 +20,29 @@
  */
 package org.apache.qpid.server.handler;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 
 public class MessageConsumeHandler implements 
StateAwareMethodListener<MessageConsumeBody>
 {
+    private static final Logger _log = 
Logger.getLogger(MessageConsumeHandler.class);
+
     private static MessageConsumeHandler _instance = new 
MessageConsumeHandler();
 
     public static MessageConsumeHandler getInstance()
@@ -39,16 +51,105 @@
     }
 
     private MessageConsumeHandler() {}
-    
-    
+
+
     public void methodReceived (AMQStateManager stateManager,
-                                                       QueueRegistry 
queueRegistry,
+                                QueueRegistry queueRegistry,
                                ExchangeRegistry exchangeRegistry,
-                                AMQProtocolSession protocolSession,
+                                AMQProtocolSession session,
                                        AMQMethodEvent<MessageConsumeBody> evt)
                                 throws AMQException
     {
-               // TODO
+        MessageConsumeBody body = evt.getMethod();
+        final int channelId = evt.getChannelId();
+
+        AMQChannel channel = session.getChannel(channelId);
+        if (channel == null)
+        {
+            _log.error("Channel " + channelId + " not found");
+            // TODO: either alert or error that the
+        }
+        else
+        {
+            AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : 
queueRegistry.getQueue(body.queue);
+
+            if (queue == null)
+            {
+                _log.info("No queue for '" + body.queue + "'");
+                if(body.queue!=null)
+                {
+                    channelClose(session, channelId, stateManager,
+                                 "No such queue, '" + body.queue + "'", 
AMQConstant.NOT_FOUND);
+                }
+                else
+                {
+                    connectionClose(session, channelId, stateManager,
+                                    "No queue name provided, no default queue 
defined.",
+                                    AMQConstant.NOT_ALLOWED);
+                }
+            }
+            else
+            {
+                try
+                {
+                    /*AMQShort*/String destination = channel.subscribeToQueue
+                        (body.destination, queue, session, !body.noAck, 
/*XXX*/null, body.noLocal);
+                    // AMQP version change: Hardwire the version to 0-8 
(major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as 
versions change.
+                    session.writeResponse(evt, 
MessageOkBody.createMethodBody((byte)0, (byte)9));
+
+                    //now allow queue to start async processing of any backlog 
of messages
+                    queue.deliverAsync();
+                }
+                catch (AMQInvalidSelectorException ise)
+                {
+                    _log.info("Closing connection due to invalid selector");
+                    channelClose(session, channelId, stateManager, 
ise.getMessage(), AMQConstant.INVALID_SELECTOR);
+                }
+                catch (ConsumerTagNotUniqueException e)
+                {
+                    connectionClose(session, channelId, stateManager,
+                                    "Non-unique consumer tag, '" + 
body.destination + "'",
+                                    AMQConstant.NOT_ALLOWED);
+                }
+            }
+        }
     }
+
+    private void channelClose(AMQProtocolSession session, int channelId, 
AMQMethodListener listener,
+                              String message, AMQConstant code)
+        throws AMQException
+    {
+        /*AMQShort*/String msg = new /*AMQShort*/String(message);
+        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
+        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        session.writeRequest(channelId, ChannelCloseBody.createMethodBody
+                             ((byte)0, (byte)9,        // AMQP version (major, 
minor)
+                              MessageConsumeBody.getClazz((byte)0, (byte)9),   
// classId
+                              MessageConsumeBody.getMethod((byte)0, (byte)9),  
// methodId
+                              code.getCode(),  // replyCode
+                              msg),    // replyText
+                             listener);
+    }
+
+    private void connectionClose(AMQProtocolSession session, int channelId, 
AMQMethodListener listener,
+                                 String message, AMQConstant code)
+        throws AMQException
+    {
+        /*AMQShort*/String msg = new /*AMQShort*/String(message);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        session.writeRequest(channelId, ConnectionCloseBody.createMethodBody
+                             ((byte)0, (byte)9,        // AMQP version (major, 
minor)
+                              MessageConsumeBody.getClazz((byte)0, (byte)9),   
// classId
+                              MessageConsumeBody.getMethod((byte)0, (byte)9),  
// methodId
+                              code.getCode(),  // replyCode
+                              msg),    // replyText
+                             listener);
+    }
+
 }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java?view=diff&rev=497278&r1=497277&r2=497278
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
 Wed Jan 17 17:28:44 2007
@@ -20,9 +20,15 @@
  */
 package org.apache.qpid.server.handler;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.MessageTransferBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
@@ -31,6 +37,8 @@
 
 public class MessageTransferHandler implements 
StateAwareMethodListener<MessageTransferBody>
 {
+    private static final Logger _log = 
Logger.getLogger(MessageTransferHandler.class);
+
     private static MessageTransferHandler _instance = new 
MessageTransferHandler();
 
     public static MessageTransferHandler getInstance()
@@ -38,9 +46,10 @@
         return _instance;
     }
 
+    private static final String UNKNOWN_EXCHANGE_NAME = "Unknown exchange 
name";
+
     private MessageTransferHandler() {}
-    
-    
+
     public void methodReceived (AMQStateManager stateManager,
                                                        QueueRegistry 
queueRegistry,
                                ExchangeRegistry exchangeRegistry,
@@ -48,7 +57,39 @@
                                        AMQMethodEvent<MessageTransferBody> evt)
                                 throws AMQException
     {
-               // TODO
+        final MessageTransferBody body = evt.getMethod();
+
+        if (_log.isDebugEnabled()) {
+            _log.debug("Publish received on channel " + evt.getChannelId());
+        }
+
+        // TODO: check the delivery tag field details - is it unique across 
the broker or per subscriber?
+        if (body.exchange == null) {
+            body.exchange = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+        }
+        Exchange e = exchangeRegistry.getExchange(body.exchange);
+        // if the exchange does not exist we raise a channel exception
+        if (e == null) {
+            protocolSession.closeChannel(evt.getChannelId());
+            // TODO: modify code gen to make getClazz and getMethod public 
methods rather than protected
+            // then we can remove the hardcoded 0,0
+            // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
+            // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions 
change.
+            AMQMethodBody cf = ChannelCloseBody.createMethodBody
+                ((byte)8, (byte)0,     // AMQP version (major, minor)
+                 MessageTransferBody.getClazz((byte)0, (byte)9),       // 
classId
+                 MessageTransferBody.getMethod((byte)0, (byte)9),      // 
methodId
+                 500,  // replyCode
+                 UNKNOWN_EXCHANGE_NAME);       // replyText
+            protocolSession.writeRequest(evt.getChannelId(), cf, stateManager);
+        } else {
+            // The partially populated BasicDeliver frame plus the received 
route body
+            // is stored in the channel. Once the final body frame has been 
received
+            // it is routed to the exchange.
+            AMQChannel channel = 
protocolSession.getChannel(evt.getChannelId());
+            channel.addMessageTransfer(body, protocolSession);
+        }
     }
 }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=497278&r1=497277&r2=497278
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Wed Jan 17 17:28:44 2007
@@ -132,7 +132,38 @@
     }
 
     public long getSize() {
-        throw new Error("XXX");
+        return getHeaderSize() + getBodySize();
+    }
+
+    public int getHeaderSize() {
+        int size = _transferBody.getBodySize();
+        Content body = _transferBody.getBody();
+        switch (body.getContentType()) {
+        case CONTENT_TYPE_INLINE:
+            size -= _transferBody.getBody().getEncodedSize();
+            break;
+        }
+        return size;
+    }
+
+    public long getBodySize() {
+        Content body = _transferBody.getBody();
+        switch (body.getContentType()) {
+        case CONTENT_TYPE_INLINE:
+            return _transferBody.getBody().getContent().length;
+        case CONTENT_TYPE_REFERENCE:
+            return getReferenceSize();
+        default:
+            throw new IllegalStateException("unrecognized type: " + 
body.getContentType());
+        }
+    }
+
+    public long getReferenceSize() {
+        long size = 0;
+        for (MessageAppendBody mab : _contentBodies) {
+            size += mab.getBytes().length;
+        }
+        return size;
     }
 
     public FieldTable getHeadersTable() {
@@ -164,7 +195,7 @@
     }
 
     public byte getDeliveryMode() {
-        throw new Error("XXX");
+        return (byte) _transferBody.deliveryMode;
     }
 
     public void setReplyTo(String replyTo) {

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=497278&r1=497277&r2=497278
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 Wed Jan 17 17:28:44 2007
@@ -190,7 +190,7 @@
             return 0l;
         }
 
-        return msg.getSize();
+        return msg.getBodySize();
     }
 
     /**

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=497278&r1=497277&r2=497278
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Wed Jan 17 17:28:44 2007
@@ -96,7 +96,7 @@
     private boolean addMessageToQueue(AMQMessage msg)
     {
         // Shrink the ContentBodies to their actual size to save memory.
-        if (true) throw new Error("XXX");
+        // XXX
         /*if (compressBufferOnQueue)
         {
             Iterator it = msg.getContentBodies().iterator();

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?view=diff&rev=497278&r1=497277&r2=497278
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
 Wed Jan 17 17:28:44 2007
@@ -42,7 +42,7 @@
     }
 
     /** unsigned short */
-    protected abstract int getBodySize();
+    public abstract int getBodySize();
 
     /**
      * @return unsigned short


Reply via email to