Author: rgodfrey
Date: Wed Apr 16 04:43:37 2008
New Revision: 648672

URL: http://svn.apache.org/viewvc?rev=648672&view=rev
Log:
QPID-933 : performance tweaks

Modified:
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
    
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
    
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=648672&r1=648671&r2=648672&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
 Wed Apr 16 04:43:37 2008
@@ -47,7 +47,7 @@
 
     void add(long deliveryTag, UnacknowledgedMessage message);
 
-    void collect(long deliveryTag, boolean multiple, 
List<UnacknowledgedMessage> msgs);
+    void collect(Long deliveryTag, boolean multiple, 
List<UnacknowledgedMessage> msgs);
 
     boolean contains(long deliveryTag) throws AMQException;
 

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=648672&r1=648671&r2=648672&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
 Wed Apr 16 04:43:37 2008
@@ -28,9 +28,6 @@
 import java.util.Set;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.txn.TransactionalContext;
 
 public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
@@ -51,13 +48,7 @@
         _map = new LinkedHashMap<Long, UnacknowledgedMessage>(prefetchLimit);
     }
 
-    /*public UnacknowledgedMessageMapImpl(Object lock, Map<Long, 
UnacknowledgedMessage> map)
-    {
-        _lock = lock;
-        _map = map;
-    } */
-
-    public void collect(long deliveryTag, boolean multiple, 
List<UnacknowledgedMessage> msgs)
+    public void collect(Long deliveryTag, boolean multiple, 
List<UnacknowledgedMessage> msgs)
     {
         if (multiple)
         {
@@ -213,14 +204,14 @@
         }
     }
 
-    private void collect(long key, List<UnacknowledgedMessage> msgs)
+    private void collect(Long key, List<UnacknowledgedMessage> msgs)
     {
         synchronized (_lock)
         {
             for (Map.Entry<Long, UnacknowledgedMessage> entry : 
_map.entrySet())
             {
                 msgs.add(entry.getValue());
-                if (entry.getKey() == key)
+                if (entry.getKey().equals(key))
                 {
                     break;
                 }

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=648672&r1=648671&r2=648672&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
 Wed Apr 16 04:43:37 2008
@@ -179,26 +179,31 @@
     private AMQBody createEncodedDeliverFrame(AMQMessage message, final int 
channelId, final long deliveryTag, final AMQShortString consumerTag)
             throws AMQException
     {
+
+
         final MessagePublishInfo pb = message.getMessagePublishInfo();
         final AMQMessageHandle messageHandle = message.getMessageHandle();
 
 
-        final boolean isRedelivered = messageHandle.isRedelivered();
-        final AMQShortString exchangeName = pb.getExchange();
-        final AMQShortString routingKey = pb.getRoutingKey();
-
         final AMQBody returnBlock = new AMQBody()
         {
 
+
+
+            private final boolean _isRedelivered = 
messageHandle.isRedelivered();
+            private final AMQShortString _exchangeName = pb.getExchange();
+            private final AMQShortString _routingKey = pb.getRoutingKey();
+
+
             public AMQBody _underlyingBody;
 
             public AMQBody createAMQBody()
             {
                 return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
                                                               deliveryTag,
-                                                              isRedelivered,
-                                                              exchangeName,
-                                                              routingKey);
+                                                              _isRedelivered,
+                                                              _exchangeName,
+                                                              _routingKey);
 
 
 

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=648672&r1=648671&r2=648672&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
 Wed Apr 16 04:43:37 2008
@@ -265,10 +265,6 @@
      */
     public void messageSent(IoSession protocolSession, Object object) throws 
Exception
     {
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Message sent: " + object);
-        }
     }
 
     protected boolean isSSLClient(ConnectorConfiguration connectionConfig,

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java?rev=648672&r1=648671&r2=648672&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
 Wed Apr 16 04:43:37 2008
@@ -22,6 +22,7 @@
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ArrayList;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -40,7 +41,7 @@
 
     private MessagePublishInfo _messagePublishInfo;
 
-    private List<ContentChunk> _contentBodies = new LinkedList<ContentChunk>();
+    private List<ContentChunk> _contentBodies = new ArrayList<ContentChunk>();
 
     private boolean _redelivered;
 

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=648672&r1=648671&r2=648672&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 Wed Apr 16 04:43:37 2008
@@ -254,13 +254,6 @@
         {
             long deliveryTag = channel.getNextDeliveryTag();
 
-            // We don't need to add the message to the unacknowledgedMap as we 
don't need to know if the client
-            // received the message. If it is lost in transit that is not 
important.
-//            if (_acks)
-//            {
-//                channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, 
consumerTag, queue);
-//            }
-
             if (_sendLock.get())
             {
                 _logger.error("Sending " + msg + " when subscriber(" + this + 
") is closed!");
@@ -283,25 +276,23 @@
 
             // The send may of course still fail, in which case, as
             // the message is unacked, it will be lost.
+            final AMQMessage message = entry.getMessage();
+
             if (!_acks)
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("No ack mode so dequeuing message 
immediately: " + entry.getMessage().getMessageId());
+                    _logger.debug("No ack mode so dequeuing message 
immediately: " + message.getMessageId());
                 }
                 queue.dequeue(storeContext, entry);
             }
 
-/*
-            if (_sendLock.get())
-            {
-                _logger.error("Sending " + entry + " when subscriber(" + this 
+ ") is closed!");
-            }
-*/
+            final ProtocolOutputConverter outputConverter = 
protocolSession.getProtocolOutputConverter();
+            final int channelId = channel.getChannelId();
 
             synchronized (channel)
             {
-                long deliveryTag = channel.getNextDeliveryTag();
+                final long deliveryTag = channel.getNextDeliveryTag();
 
 
                 if (_acks)
@@ -309,13 +300,13 @@
                     channel.addUnacknowledgedMessage(entry, deliveryTag, 
consumerTag);
                 }
 
-                
protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), 
channel.getChannelId(), deliveryTag, consumerTag);
+                outputConverter.writeDeliver(message, channelId, deliveryTag, 
consumerTag);
 
 
             }
             if (!_acks)
             {
-                entry.getMessage().decrementReference(storeContext);
+                message.decrementReference(storeContext);
             }
         }
         finally

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java?rev=648672&r1=648671&r2=648672&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java
 Wed Apr 16 04:43:37 2008
@@ -28,14 +28,18 @@
 import org.apache.qpid.server.security.access.Accessable;
 import org.apache.qpid.server.security.access.Permission;
 import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
 
 public class AllowAll implements ACLPlugin
 {
+
+    private static final Logger _logger = ACLManager.getLogger();
+
     public AccessResult authorise(AMQProtocolSession session, Permission 
permission, AMQMethodBody body, Object... parameters)
     {
-        if (ACLManager.getLogger().isDebugEnabled())
+        if (_logger.isDebugEnabled())
         {
-            ACLManager.getLogger().debug("Allowing user:" + 
session.getAuthorizedID() + " for :" + permission.toString()
+            _logger.debug("Allowing user:" + session.getAuthorizedID() + " for 
:" + permission.toString()
                                         + " on " + 
body.getClass().getSimpleName()
                                         + (parameters == null || 
parameters.length == 0 ? "" : "-" + accessablesToString(parameters)));
         }

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=648672&r1=648671&r2=648672&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Apr 16 04:43:37 2008
@@ -182,6 +182,46 @@
                 _fastAccessConsumers[i] = null;
             }
         }
+
+
+        public void acknowledgeDelivered()
+        {
+
+            for(int i = 0; i<16; i++)
+            {
+                final BasicMessageConsumer c = _fastAccessConsumers[i];
+                if(c != null)
+                {
+                    c.acknowledgeDelivered();
+                }
+            }
+            if(!_slowAccessConsumers.isEmpty())
+            {
+                for (Iterator<BasicMessageConsumer> i = 
_slowAccessConsumers.values().iterator(); i.hasNext();)
+                {
+                    i.next().acknowledgeDelivered();
+                }
+            }
+        }
+
+        public void acknowledge() throws JMSException
+        {
+            for(int i = 0; i<16; i++)
+            {
+                final BasicMessageConsumer c = _fastAccessConsumers[i];
+                if(c != null)
+                {
+                    c.acknowledge();
+                }
+            }
+            if(!_slowAccessConsumers.isEmpty())
+            {
+                for (Iterator<BasicMessageConsumer> i = 
_slowAccessConsumers.values().iterator(); i.hasNext();)
+                {
+                    i.next().acknowledge();
+                }
+            }
+        }
     }
 
 
@@ -500,10 +540,7 @@
             throw new IllegalStateException("Session is already closed");
         }
 
-        for (BasicMessageConsumer consumer : _consumers.values())
-        {
-            consumer.acknowledge();
-        }
+        _consumers.acknowledge();
     }
 
     /**
@@ -725,12 +762,9 @@
                     // We only need to find the highest value and ack that as 
commit is session level.
                     Long lastTag = -1L;
 
-                    for (Iterator<BasicMessageConsumer> i = 
_consumers.values().iterator(); i.hasNext();)
-                    {
-                        i.next().acknowledgeDelivered();
-                    }
+                    _consumers.acknowledgeDelivered();
 
-                    if (_transacted)
+                    if (_transacted && !_removedConsumers.isEmpty())
                     {
                         // Do the above, but for consumers which have been 
de-registered since the
                         // last commit

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=648672&r1=648671&r2=648672&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 Wed Apr 16 04:43:37 2008
@@ -71,8 +71,17 @@
             while (it.hasNext())
             {
                 ContentBody cb = (ContentBody) it.next();
-                data.put(cb.payload);
-                cb.payload.release();
+                final ByteBuffer payload = cb.payload;
+                if(payload.isDirect() || payload.isReadOnly())
+                {
+                    data.put(payload);
+                }
+                else
+                {
+                    data.put(payload.array(), payload.arrayOffset(), 
payload.limit());
+                }
+
+                payload.release();
             }
 
             data.flip();

Modified: 
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=648672&r1=648671&r2=648672&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
 Wed Apr 16 04:43:37 2008
@@ -65,8 +65,15 @@
     {
         if (payload != null)
         {
-            ByteBuffer copy = payload.duplicate();
-            buffer.put(copy.rewind());
+            if(payload.isDirect() || payload.isReadOnly())
+            {            
+                ByteBuffer copy = payload.duplicate();
+                buffer.put(copy.rewind());
+            }
+            else
+            {
+                
buffer.put(payload.array(),payload.arrayOffset(),payload.limit());
+            }
         }
     }
 

Modified: 
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=648672&r1=648671&r2=648672&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
 Wed Apr 16 04:43:37 2008
@@ -957,16 +957,21 @@
 
         if (_encodedForm != null)
         {
+            if(buffer.isDirect() || buffer.isReadOnly())
+            {
+                ByteBuffer encodedForm = _encodedForm.duplicate();
 
-            ByteBuffer encodedForm = _encodedForm.duplicate();
+                if (encodedForm.position() != 0)
+                {
+                    encodedForm.flip();
+                }
 
-            if (encodedForm.position() != 0)
+                buffer.put(encodedForm);
+            }
+            else
             {
-                encodedForm.flip();
+                
buffer.put(_encodedForm.array(),_encodedForm.arrayOffset(),(int)_encodedSize);
             }
-            // _encodedForm.limit((int)getEncodedSize());
-
-            buffer.put(encodedForm);
         }
         else if (_properties != null)
         {

Modified: 
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=648672&r1=648671&r2=648672&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
 Wed Apr 16 04:43:37 2008
@@ -45,31 +45,21 @@
 
     public AMQBody convertToBody(ContentChunk contentChunk)
     {
-        return new ContentBody(contentChunk.getData());
+        if(contentChunk instanceof ContentChunk_0_9)
+        {
+            return ((ContentChunk_0_9)contentChunk).toBody();
+        }
+        else
+        {
+            return new ContentBody(contentChunk.getData());
+        }
     }
 
     public ContentChunk convertToContentChunk(AMQBody body)
     {
         final ContentBody contentBodyChunk = (ContentBody) body;
 
-        return new ContentChunk()
-        {
-
-            public int getSize()
-            {
-                return contentBodyChunk.getSize();
-            }
-
-            public ByteBuffer getData()
-            {
-                return contentBodyChunk.payload;
-            }
-
-            public void reduceToFit()
-            {
-                contentBodyChunk.reduceBufferToFit();
-            }
-        };
+        return new ContentChunk_0_9(contentBodyChunk);
 
     }
 
@@ -147,6 +137,36 @@
         public AMQShortString getRoutingKey()
         {
             return _routingKey;
+        }
+    }
+
+    private static class ContentChunk_0_9 implements ContentChunk
+    {
+        private final ContentBody _contentBodyChunk;
+
+        public ContentChunk_0_9(final ContentBody contentBodyChunk)
+        {
+            _contentBodyChunk = contentBodyChunk;
+        }
+
+        public int getSize()
+        {
+            return _contentBodyChunk.getSize();
+        }
+
+        public ByteBuffer getData()
+        {
+            return _contentBodyChunk.payload;
+        }
+
+        public void reduceToFit()
+        {
+            _contentBodyChunk.reduceBufferToFit();
+        }
+
+        public AMQBody toBody()
+        {
+            return _contentBodyChunk;
         }
     }
 }


Reply via email to