Author: bhupendrab
Date: Wed Nov  1 09:48:23 2006
New Revision: 470012

URL: http://svn.apache.org/viewvc?view=rev&rev=470012
Log:
AMQQueueMBean updated.  The message content is sent with header attributes. New 
management operation added to view message content for a particular messageId.

Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=470012&r1=470011&r2=470012
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/AMQQueue.java
 Wed Nov  1 09:48:23 2006
@@ -18,7 +18,9 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.AMQManagedObject;
@@ -97,7 +99,7 @@
     /**
      * max allowed size of a single message(in KBytes).
      */
-    private long _maxAllowedMessageSize = 10000;    // 10 MB
+    private long _maxAllowedMessageSize = 10000;  // 10 MB
 
     /**
      * max allowed number of messages on a queue.
@@ -107,7 +109,7 @@
     /**
      * max allowed size in  KBytes for all the messages combined together in a 
queue.
      */
-    private long _queueDepth = 10000000;  //   10 GB
+    private long _queueDepth = 10000000;          //   10 GB
 
     /**
      * total messages received by the queue since startup.
@@ -122,33 +124,39 @@
     private final class AMQQueueMBean extends AMQManagedObject implements 
ManagedQueue
     {
         private String _queueName = null;
-        //private MBeanInfo _mbeanInfo;
 
-        // AMQ message attribute names exposed.
+        // AMQ message attribute names
         private String[] _msgAttributeNames = {"MessageId",
-                                               "Redelivered",
-                                               "Content's size",
-                                               "Contents"};
+                                               "Header",
+                                               "Size",
+                                               "Redelivered"
+                                              };
         // AMQ Message attribute descriptions.
         private String[] _msgAttributeDescriptions = {"Message Id",
-                                                      "Redelivered",
-                                                      "Message content's size 
in bytes",
-                                                      "Message content 
bodies"};
-        // AMQ message attribute types.
-        private OpenType[] _msgAttributeTypes = new OpenType[4];
-        // Messages will be indexed according to the messageId.
-        private String[] _msgAttributeIndex = {"MessageId"};
-        // Composite type for representing AMQ Message data.
-        private CompositeType _messageDataType = null;
-        // Datatype for representing AMQ messages list.
-        private TabularType _messagelistDataType = null;
-
-        private String[] _contentNames = {"SerialNumber", "ContentBody"};
-        private String[] _contentDesc = {"Serial Number", "Content Body"};
-        private String[] _contentIndex = {"SerialNumber"};
-        private OpenType[] _contentType = new OpenType[2];
-        private CompositeType _contentBodyType = null;
-        private TabularType _contentBodyListType = null;
+                                                      "Header",
+                                                      "Message size in bytes",
+                                                      "Redelivered"
+                                                     };
+
+        private OpenType[] _msgAttributeTypes = new OpenType[4];  // AMQ 
message attribute types.
+        private String[]   _msgAttributeIndex = {"MessageId"};    // Messages 
will be indexed according to the messageId.
+        private CompositeType _messageDataType = null;            // Composite 
type for representing AMQ Message data.
+        private TabularType   _messagelistDataType = null;        // Datatype 
for representing AMQ messages list.
+
+
+        private CompositeType _msgContentType = null;    // For message content
+        private String[]      _msgContentAttributes = {"MessageId",
+                                                       "MimeType",
+                                                       "Encoding",
+                                                       "Content"
+                                                      };
+        private String[]    _msgContentDescriptions = {"Message Id",
+                                                       "MimeType",
+                                                       "Encoding",
+                                                       "Message content"
+                                                      };
+        private OpenType[]  _msgContentAttributeTypes = new OpenType[4];
+
 
         @MBeanConstructor("Creates an MBean exposing an AMQQueue.")
         public AMQQueueMBean() throws NotCompliantMBeanException
@@ -162,22 +170,21 @@
             _queueName = jmxEncode(new StringBuffer(_name), 0).toString();
             try
             {
-                _contentType[0] = SimpleType.INTEGER;
-                _contentType[1] = new ArrayType(1, SimpleType.BYTE);
-                _contentBodyType = new CompositeType("Content",
-                                                     "Content",
-                                                     _contentNames,
-                                                     _contentDesc,
-                                                     _contentType);
-                _contentBodyListType = new TabularType("MessageContents",
-                                                       "Message Contents",
-                                                       _contentBodyType,
-                                                       _contentIndex);
-
-                _msgAttributeTypes[0] = SimpleType.LONG;
-                _msgAttributeTypes[1] = SimpleType.BOOLEAN;
-                _msgAttributeTypes[2] = SimpleType.LONG;
-                _msgAttributeTypes[3] = _contentBodyListType;
+                _msgContentAttributeTypes[0] = SimpleType.LONG;                
    // For message id
+                _msgContentAttributeTypes[1] = SimpleType.STRING;              
    // For MimeType
+                _msgContentAttributeTypes[2] = SimpleType.STRING;              
    // For Encoding
+                _msgContentAttributeTypes[3] = new ArrayType(1, 
SimpleType.BYTE);  // For message content
+                _msgContentType = new CompositeType("MessageContent",
+                                                     "AMQ Message Content",
+                                                     _msgContentAttributes,
+                                                     _msgContentDescriptions,
+                                                     
_msgContentAttributeTypes);
+
+
+                _msgAttributeTypes[0] = SimpleType.LONG;                      
// For message id
+                _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING);  
// For header attributes
+                _msgAttributeTypes[2] = SimpleType.LONG;                      
// For size
+                _msgAttributeTypes[3] = SimpleType.BOOLEAN;                   
// For redelivered
 
                 _messageDataType = new CompositeType("Message",
                                                      "AMQ Message",
@@ -288,10 +295,8 @@
             }
             return new Long(Math.round(queueSize / 100));
         }
-        // Operations
 
         // calculates the size of an AMQMessage
-
         private long getMessageSize(AMQMessage msg)
         {
             if (msg == null)
@@ -374,6 +379,53 @@
             }
         }
 
+        public CompositeData viewMessageContent(long msgId) throws JMException
+        {
+            List<AMQMessage> list = _deliveryMgr.getMessages();
+            CompositeData messageContent = null;
+            AMQMessage msg = null;
+            for (AMQMessage message : list)
+            {
+                if (message.getMessageId() == msgId)
+                {
+                    msg = message;
+                    break;
+                }
+            }
+
+            if (msg != null)
+            {
+                // get message content
+                List<ContentBody> cBodies = msg.getContentBodies();
+                List<Byte> msgContent = new ArrayList<Byte>();
+                for (ContentBody body : cBodies)
+                {
+                    if (body.getSize() != 0)
+                    {
+                        ByteBuffer slice = body.payload.slice();
+                        for (int j = 0; j < slice.limit(); j++)
+                        {
+                            msgContent.add(slice.get());
+                        }
+                    }
+                }
+
+                // Create header attributes list
+                BasicContentHeaderProperties headerProperties = 
(BasicContentHeaderProperties)msg.getContentHeaderBody().properties;
+                String mimeType = headerProperties.getContentType();
+                String encoding = headerProperties.getEncoding() == null ? "" 
: headerProperties.getEncoding();
+
+                Object[] itemValues = {msgId, mimeType, encoding, 
msgContent.toArray(new Byte[0])};
+                messageContent = new CompositeDataSupport(_msgContentType, 
_msgContentAttributes, itemValues);
+            }
+            else
+            {
+                throw new JMException("AMQMessage with message id = " + msgId 
+ " is not in the " + _queueName );
+            }
+
+            return messageContent;
+        }
+
         /**
          * Returns the messages stored in this queue in tabular form.
          *
@@ -391,43 +443,38 @@
             }
 
             List<AMQMessage> list = _deliveryMgr.getMessages();
+            TabularDataSupport _messageList = new 
TabularDataSupport(_messagelistDataType);
 
             if (beginIndex > list.size())
             {
-                throw new JMException("FromIndex = " + beginIndex + ". There 
are only " + list.size() + " messages in the queue");
+                return _messageList;
             }
-
             endIndex = endIndex < list.size() ? endIndex : list.size();
-            TabularDataSupport _messageList = new 
TabularDataSupport(_messagelistDataType);
 
             for (int i = beginIndex; i <= endIndex; i++)
             {
                 AMQMessage msg = list.get(i - 1);
-                long msgId = msg.getMessageId();
-
-                List<ContentBody> cBodies = msg.getContentBodies();
-
-                TabularDataSupport _contentList = new 
TabularDataSupport(_contentBodyListType);
-                int contentSerialNo = 1;
                 long size = 0;
-
+                // get message content
+                List<ContentBody> cBodies = msg.getContentBodies();
                 for (ContentBody body : cBodies)
                 {
-                    if (body.getSize() != 0)
-                    {
-                        Byte[] byteArray = 
getByteArray(body.payload.slice().array());
-                        size = size + byteArray.length;
-
-                        Object[] contentValues = {contentSerialNo, byteArray};
-                        CompositeData contentData = new 
CompositeDataSupport(_contentBodyType,
-                                                                             
_contentNames,
-                                                                             
contentValues);
-
-                        _contentList.put(contentData);
-                    }
+                    size = size + body.getSize();
                 }
 
-                Object[] itemValues = {msgId, true, size, _contentList};
+                // Create header attributes list
+                BasicContentHeaderProperties headerProperties = 
(BasicContentHeaderProperties)msg.getContentHeaderBody().properties;
+                List<String> headerAttribsList = new ArrayList<String>();
+                headerAttribsList.add("App Id=" + headerProperties.getAppId());
+                headerAttribsList.add("MimeType=" + 
headerProperties.getContentType());
+                headerAttribsList.add("Correlation Id=" + 
headerProperties.getCorrelationId());
+                headerAttribsList.add("Encoding=" + 
headerProperties.getEncoding());
+                headerAttribsList.add(headerProperties.toString());
+
+                Object[] itemValues = {msg.getMessageId(),
+                                       headerAttribsList.toArray(new 
String[0]),
+                                       size, msg.isRedelivered()};
+
                 CompositeData messageData = new 
CompositeDataSupport(_messageDataType,
                                                                      
_msgAttributeNames,
                                                                      
itemValues);
@@ -435,26 +482,6 @@
             }
 
             return _messageList;
-        }
-
-        /**
-         * A utility to convert byte[] to Byte[]. Required to create composite
-         * type for message contents.
-         *
-         * @param byteArray message content as byte[]
-         * @return Byte[]
-         */
-        private Byte[] getByteArray(byte[] byteArray)
-        {
-            int size = byteArray.length;
-            List<Byte> list = new ArrayList<Byte>();
-
-            for (int i = 0; i < size; i++)
-            {
-                list.add(byteArray[i]);
-            }
-
-            return list.toArray(new Byte[0]);
         }
 
         /**

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java?view=diff&rev=470012&r1=470011&r2=470012
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/queue/ManagedQueue.java
 Wed Nov  1 09:48:23 2006
@@ -24,6 +24,7 @@
 import javax.management.JMException;
 import javax.management.MBeanOperationInfo;
 import javax.management.openmbean.TabularData;
+import javax.management.openmbean.CompositeData;
 import java.io.IOException;
 
 /**
@@ -209,4 +210,8 @@
                          impact= MBeanOperationInfo.ACTION)
     void clearQueue() throws IOException, JMException;
 
+    @MBeanOperation(name="viewMessageContent",
+                         description="Returns the message content along with 
MimeType and Encoding")
+    CompositeData viewMessageContent(@MBeanOperationParameter(name="Message 
Id", description="Message Id")long messageId) 
+        throws IOException, JMException;
 }


Reply via email to