Author: rhs
Date: Thu Jan 18 09:56:44 2007
New Revision: 497515

URL: http://svn.apache.org/viewvc?view=rev&rev=497515
Log:
made message-transfer return a result, switched over message delivery to use 
message-transfer, added a generated .copy() to method bodies, and made 
hello-world acknowledge the message it sends to itself

Modified:
    incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
    
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/qpid.0-9/python/hello-world
    incubator/qpid/branches/qpid.0-9/python/qpid/client.py

Modified: 
incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl?view=diff&rev=497515&r1=497514&r2=497515
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl 
(original)
+++ incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl 
Thu Jan 18 09:56:44 2007
@@ -8,9 +8,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,7 +25,7 @@
  * Supported AMQP versions:
 %{VLIST} *   ${major}-${minor}
  */
- 
+
 package org.apache.qpid.framing;
 
 import java.util.TreeMap;
@@ -36,13 +36,13 @@
 {
     public static final TreeMap<String, Integer> classIdMap = new 
TreeMap<String, Integer>();
     public static final TreeMap<String, Integer> methodIdMap = new 
TreeMap<String, Integer>();
-    
+
     static
     {
 ${CLASS_ID_INIT}
 ${METHOD_ID_INIT}
     }
-    
+
     // Fields declared in specification
 %{FLIST}    ${field_declaration}
 
@@ -56,15 +56,15 @@
     public int getMethod() { return methodIdMap.get(major + "-" + minor); }
     public static int getClazz(byte major, byte minor) { return 
classIdMap.get(major + "-" + minor); }
     public static int getMethod(byte major, byte minor) { return 
methodIdMap.get(major + "-" + minor); }
-    
-    // Field methods           
+
+    // Field methods
 %{FLIST}    ${mb_field_get_method}
 
     public int getBodySize()
-    {      
+    {
         int size = 0;
 %{FLIST}    ${mb_field_size}
-        return size;        
+        return size;
     }
 
     protected void writeMethodPayload(ByteBuffer buffer)
@@ -90,7 +90,15 @@
     {
         ${CLASS}${METHOD}Body bodyFrame = new ${CLASS}${METHOD}Body(major, 
minor);
 %{FLIST}    ${mb_field_body_initialize}
-                        
+
+        return bodyFrame;
+    }
+
+    public ${CLASS}${METHOD}Body copy()
+    {
+        ${CLASS}${METHOD}Body bodyFrame = new ${CLASS}${METHOD}Body(major, 
minor);
+%{FLIST}    ${mb_field_body_initialize}
+
         return bodyFrame;
     }
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java?view=diff&rev=497515&r1=497514&r2=497515
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
 Thu Jan 18 09:56:44 2007
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.handler;
 
+import org.apache.qpid.framing.MessageOkBody;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -89,6 +90,7 @@
             // it is routed to the exchange.
             AMQChannel channel = 
protocolSession.getChannel(evt.getChannelId());
             channel.addMessageTransfer(body, protocolSession);
+            protocolSession.writeResponse(evt, 
MessageOkBody.createMethodBody((byte)0, (byte)9));
         }
     }
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=497515&r1=497514&r2=497515
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 Thu Jan 18 09:56:44 2007
@@ -23,17 +23,21 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.Content;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
 
 import java.util.Queue;
 
@@ -262,19 +266,33 @@
             }
             synchronized(channel)
             {
-                long deliveryTag = channel.getNextDeliveryTag();
+                final long deliveryTag = channel.getNextDeliveryTag();
 
                 if (_acks)
                 {
                     channel.addUnacknowledgedMessage(msg, deliveryTag, 
consumerTag, queue);
                 }
 
-                ByteBuffer deliver = null;
-                if (true) throw new Error("XXX");
-                //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), 
msg.getExchangeName());
-                AMQDataBlock frame = msg.getDataBlock(deliver, 
channel.getChannelId());
-
-                protocolSession.writeFrame(frame);
+                // XXX: references
+                MessageTransferBody mtb = msg.getTransferBody().copy();
+                mtb.destination = consumerTag;
+                try {
+                    protocolSession.writeRequest
+                        (channel.getChannelId(),
+                         mtb, new AMQMethodListener() {
+                             public boolean methodReceived(AMQMethodEvent evt) 
throws AMQException {
+                                 if (_logger.isDebugEnabled()) {
+                                     _logger.debug("Ack received on channel " 
+ evt.getChannelId());
+                                 }
+                                 // XXX: multiple
+                                 channel.acknowledgeMessage(deliveryTag, 
false);
+                                 return true;
+                             }
+                             public void error(Exception e) {}
+                         });
+                } catch (AMQException e) {
+                    throw new RuntimeException(e);
+                }
             }
         }
         finally
@@ -398,24 +416,4 @@
     {
         return _isBrowser;
     }
-
-
-    /*    private ByteBuffer createEncodedDeliverFrame(long deliveryTag, 
String routingKey, String exchange)
-    {
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        AMQFrame deliverFrame = 
MessageTransferBody.createAMQFrame(channel.getChannelId(),
-               (byte)0, (byte)9,       // AMQP version (major, minor)
-            consumerTag,       // consumerTag
-               deliveryTag,    // deliveryTag
-            exchange,  // exchange
-            false,     // redelivered
-            routingKey // routingKey
-            );
-        ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // 
XXX: Could cast be a problem?
-        deliverFrame.writePayload(buf);
-        buf.flip();
-        return buf;
-        }*/
 }

Modified: incubator/qpid/branches/qpid.0-9/python/hello-world
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/hello-world?view=diff&rev=497515&r1=497514&r2=497515
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/hello-world (original)
+++ incubator/qpid/branches/qpid.0-9/python/hello-world Thu Jan 18 09:56:44 2007
@@ -12,3 +12,8 @@
 ch.message_consume(queue="test", destination="test")
 ch.message_transfer(destination="amq.direct", routing_key="test",
                     body="hello world")
+msg = client.queue("test").get()
+print msg
+msg.ok()
+import time
+time.sleep(3)

Modified: incubator/qpid/branches/qpid.0-9/python/qpid/client.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/qpid/client.py?view=diff&rev=497515&r1=497514&r2=497515
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/qpid/client.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/qpid/client.py Thu Jan 18 09:56:44 
2007
@@ -100,6 +100,9 @@
     msg.tune_ok(*msg.frame.args)
     self.client.started.set()
 
+  def message_transfer(self, ch, msg):
+    self.client.queue(msg.destination).put(msg)
+
   def basic_deliver(self, ch, msg):
     self.client.queue(msg.consumer_tag).put(msg)
 


Reply via email to