Author: kpvdr
Date: Thu Jan 18 13:39:29 2007
New Revision: 497585

URL: http://svn.apache.org/viewvc?view=rev&rev=497585
Log:
Changed Content to use ByteBuffer, added Message.Transfer and Message.Cancel 
handlers

Modified:
    
incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java

Modified: 
incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/gentools/src/org/apache/qpid/gentools/JavaGenerator.java
 Thu Jan 18 13:39:29 2007
@@ -1054,8 +1054,16 @@
                int ordinal, int indentSize, int tabSize)
        {
                StringBuffer sb = new StringBuffer();
-               sb.append(Utils.createSpaces(indentSize) +
-                       "buf.append(\"  " + fieldName + ": \" + " + fieldName + 
");" + cr);             
+        if (domain.compareTo("longstr") == 0)
+        {
+                   sb.append(Utils.createSpaces(indentSize) +
+                           "buf.append(\"  " + fieldName + ": \" + new 
String(" + fieldName + "));" + cr);             
+        }
+        else
+        {
+                   sb.append(Utils.createSpaces(indentSize) +
+                           "buf.append(\"  " + fieldName + ": \" + " + 
fieldName + ");" + cr);         
+        }
                return sb.toString();
        }
        

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Thu Jan 18 13:39:29 2007
@@ -197,7 +197,7 @@
             route(message);
             break;
         case CONTENT_TYPE_REFERENCE:
-            getMessages(body.getContent()).add(message);
+            getMessages(body.getContentAsByteArray()).add(message);
             break;
         }
     }

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
 Thu Jan 18 13:39:29 2007
@@ -21,8 +21,11 @@
 package org.apache.qpid.server.handler;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.QueueRegistry;
@@ -48,7 +51,15 @@
                                        AMQMethodEvent<MessageCancelBody> evt)
                                 throws AMQException
     {
-               // TODO
+        final AMQChannel channel = 
protocolSession.getChannel(evt.getChannelId());
+        final MessageCancelBody body = evt.getMethod();
+        channel.unsubscribeConsumer(protocolSession, body.destination);
+        
+        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
+        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        final AMQMethodBody methodBody = 
MessageOkBody.createMethodBody((byte)0, (byte)9);
+        protocolSession.writeResponse(evt, methodBody);
     }
 }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
 Thu Jan 18 13:39:29 2007
@@ -74,11 +74,11 @@
             protocolSession.closeChannel(evt.getChannelId());
             // TODO: modify code gen to make getClazz and getMethod public 
methods rather than protected
             // then we can remove the hardcoded 0,0
-            // AMQP version change: Hardwire the version to 0-8 (major=8, 
minor=0)
+            // AMQP version change: Hardwire the version to 0-9 (major=0, 
minor=9)
             // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions 
change.
             AMQMethodBody cf = ChannelCloseBody.createMethodBody
-                ((byte)8, (byte)0,     // AMQP version (major, minor)
+                ((byte)0, (byte)9,     // AMQP version (major, minor)
                  MessageTransferBody.getClazz((byte)0, (byte)9),       // 
classId
                  MessageTransferBody.getMethod((byte)0, (byte)9),      // 
methodId
                  500,  // replyCode

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Thu Jan 18 13:39:29 2007
@@ -150,7 +150,7 @@
         Content body = _transferBody.getBody();
         switch (body.getContentType()) {
         case CONTENT_TYPE_INLINE:
-            return _transferBody.getBody().getContent().length;
+            return _transferBody.getBody().getContent().limit();
         case CONTENT_TYPE_REFERENCE:
             return getReferenceSize();
         default:

Modified: 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
 Thu Jan 18 13:39:29 2007
@@ -557,8 +557,6 @@
                 messageHeaders.setExpiration(0);
             }
         }
-//        messageHeaders.setDeliveryMode((byte) deliveryMode);
-//        messageHeaders.setPriority((byte) priority);
 
         int size = (payload != null) ? payload.limit() : 0;
         Content[] content = createContent(payload);
@@ -656,7 +654,7 @@
 
         if (frameCount == 1)
         {
-            bodies[0] = new 
Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.array());
+            bodies[0] = new 
Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload);
         }
         else
         {
@@ -666,7 +664,7 @@
                 payload.position((int) framePayloadMax * i);
                 int length = (remaining >= framePayloadMax) ? (int) 
framePayloadMax : (int) remaining;
                 payload.limit(payload.position() + length);
-                bodies[i] = new 
Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice().array());
+                bodies[i] = new 
Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice());
                 remaining -= length;
             }
         }

Modified: 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 Thu Jan 18 13:39:29 2007
@@ -41,7 +41,7 @@
                        MessageHeaders contentHeader, Content body) throws 
AMQException {
         ByteBuffer data;
 
-        data = ByteBuffer.allocate(body.content.length);
+        data = ByteBuffer.allocate(body.content.remaining());
         data.put(body.content);
         data.flip();
         

Modified: 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java?view=diff&rev=497585&r1=497584&r2=497585
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
 Thu Jan 18 13:39:29 2007
@@ -43,7 +43,7 @@
     }
     
     public ContentTypeEnum contentType;
-    public byte[] content;
+    public ByteBuffer content;
     
     // Constructors
     
@@ -63,20 +63,13 @@
                throw new IllegalArgumentException("Content cannot be empty for 
a ref type.");
         }
        this.contentType = contentType;
-        this.content = content;
+        this.content = ByteBuffer.allocate(content.length);
+        this.content.put(content);
     }
     
-    public Content(ContentTypeEnum contentType, String content)
+    public Content(ContentTypeEnum contentType, String contentStr)
     {
-       if (contentType == ContentTypeEnum.CONTENT_TYPE_REFERENCE)
-        {
-               if (content == null)
-               throw new IllegalArgumentException("Content cannot be null for 
a ref type.");
-               if (content.length() == 0)
-               throw new IllegalArgumentException("Content cannot be empty for 
a ref type.");
-        }
-       this.contentType = contentType;
-        this.content = content.getBytes();
+        this(contentType, contentStr.getBytes());
     }
     
     public Content(ContentTypeEnum contentType, ByteBuffer content)
@@ -89,18 +82,26 @@
                throw new IllegalArgumentException("Content cannot be empty for 
a ref type.");
         }
        this.contentType = contentType;
-        this.content = content.array();
+        this.content = content;
     }
     
     // Get functions
     
     public ContentTypeEnum getContentType() { return contentType; }
-    public byte[] getContent() { return content; }
+    public ByteBuffer getContent() { return content; }
+    
+    public byte[] getContentAsByteArray()
+    {
+        byte[] ba = new byte[content.remaining()];
+        content.get(ba);
+        return ba;
+    }
+    
     public String getContentAsString()
     {
        if (content == null)
                return null;
-        return new String(content);
+        return new String(getContentAsByteArray());
     }
     
     // Wire functions
@@ -109,18 +110,22 @@
     {
        if (content == null)
                return 1 + 4;
-       return 1 + 4 + content.length;   
+       return 1 + 4 + content.remaining();   
     }
     
     public void writePayload(ByteBuffer buffer)
     {
        EncodingUtils.writeUnsignedByte(buffer, contentType.toByte());
-        EncodingUtils.writeLongStringBytes(buffer, content);
+       EncodingUtils.writeUnsignedInteger(buffer, content.remaining());
+        buffer.put(content);
     }
     
     public void populateFromBuffer(ByteBuffer buffer) throws 
AMQFrameDecodingException
     {
         contentType = ContentTypeEnum.toContentEnum(buffer.get());
-        content = EncodingUtils.readLongstr(buffer);
+        int length = buffer.getInt();
+        content = buffer.slice();
+        buffer.skip(length);
+        content.limit(length);
     }
 }


Reply via email to