Author: rhs
Date: Thu Jan 18 09:56:44 2007
New Revision: 497515
URL: http://svn.apache.org/viewvc?view=rev&rev=497515
Log:
made message-transfer return a result, switched over message delivery to use
message-transfer, added a generated .copy() to method bodies, and made
hello-world acknowledge the message it sends to itself
Modified:
incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/branches/qpid.0-9/python/hello-world
incubator/qpid/branches/qpid.0-9/python/qpid/client.py
Modified:
incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl?view=diff&rev=497515&r1=497514&r2=497515
==============================================================================
--- incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl
(original)
+++ incubator/qpid/branches/qpid.0-9/gentools/templ.java/MethodBodyClass.tmpl
Thu Jan 18 09:56:44 2007
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,7 +25,7 @@
* Supported AMQP versions:
%{VLIST} * ${major}-${minor}
*/
-
+
package org.apache.qpid.framing;
import java.util.TreeMap;
@@ -36,13 +36,13 @@
{
public static final TreeMap<String, Integer> classIdMap = new
TreeMap<String, Integer>();
public static final TreeMap<String, Integer> methodIdMap = new
TreeMap<String, Integer>();
-
+
static
{
${CLASS_ID_INIT}
${METHOD_ID_INIT}
}
-
+
// Fields declared in specification
%{FLIST} ${field_declaration}
@@ -56,15 +56,15 @@
public int getMethod() { return methodIdMap.get(major + "-" + minor); }
public static int getClazz(byte major, byte minor) { return
classIdMap.get(major + "-" + minor); }
public static int getMethod(byte major, byte minor) { return
methodIdMap.get(major + "-" + minor); }
-
- // Field methods
+
+ // Field methods
%{FLIST} ${mb_field_get_method}
public int getBodySize()
- {
+ {
int size = 0;
%{FLIST} ${mb_field_size}
- return size;
+ return size;
}
protected void writeMethodPayload(ByteBuffer buffer)
@@ -90,7 +90,15 @@
{
${CLASS}${METHOD}Body bodyFrame = new ${CLASS}${METHOD}Body(major,
minor);
%{FLIST} ${mb_field_body_initialize}
-
+
+ return bodyFrame;
+ }
+
+ public ${CLASS}${METHOD}Body copy()
+ {
+ ${CLASS}${METHOD}Body bodyFrame = new ${CLASS}${METHOD}Body(major,
minor);
+%{FLIST} ${mb_field_body_initialize}
+
return bodyFrame;
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java?view=diff&rev=497515&r1=497514&r2=497515
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
Thu Jan 18 09:56:44 2007
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -89,6 +90,7 @@
// it is routed to the exchange.
AMQChannel channel =
protocolSession.getChannel(evt.getChannelId());
channel.addMessageTransfer(body, protocolSession);
+ protocolSession.writeResponse(evt,
MessageOkBody.createMethodBody((byte)0, (byte)9));
}
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=497515&r1=497514&r2=497515
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Thu Jan 18 09:56:44 2007
@@ -23,17 +23,21 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.Content;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import java.util.Queue;
@@ -262,19 +266,33 @@
}
synchronized(channel)
{
- long deliveryTag = channel.getNextDeliveryTag();
+ final long deliveryTag = channel.getNextDeliveryTag();
if (_acks)
{
channel.addUnacknowledgedMessage(msg, deliveryTag,
consumerTag, queue);
}
- ByteBuffer deliver = null;
- if (true) throw new Error("XXX");
- //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(),
msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver,
channel.getChannelId());
-
- protocolSession.writeFrame(frame);
+ // XXX: references
+ MessageTransferBody mtb = msg.getTransferBody().copy();
+ mtb.destination = consumerTag;
+ try {
+ protocolSession.writeRequest
+ (channel.getChannelId(),
+ mtb, new AMQMethodListener() {
+ public boolean methodReceived(AMQMethodEvent evt)
throws AMQException {
+ if (_logger.isDebugEnabled()) {
+ _logger.debug("Ack received on channel "
+ evt.getChannelId());
+ }
+ // XXX: multiple
+ channel.acknowledgeMessage(deliveryTag,
false);
+ return true;
+ }
+ public void error(Exception e) {}
+ });
+ } catch (AMQException e) {
+ throw new RuntimeException(e);
+ }
}
}
finally
@@ -398,24 +416,4 @@
{
return _isBrowser;
}
-
-
- /* private ByteBuffer createEncodedDeliverFrame(long deliveryTag,
String routingKey, String exchange)
- {
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame deliverFrame =
MessageTransferBody.createAMQFrame(channel.getChannelId(),
- (byte)0, (byte)9, // AMQP version (major, minor)
- consumerTag, // consumerTag
- deliveryTag, // deliveryTag
- exchange, // exchange
- false, // redelivered
- routingKey // routingKey
- );
- ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); //
XXX: Could cast be a problem?
- deliverFrame.writePayload(buf);
- buf.flip();
- return buf;
- }*/
}
Modified: incubator/qpid/branches/qpid.0-9/python/hello-world
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/hello-world?view=diff&rev=497515&r1=497514&r2=497515
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/hello-world (original)
+++ incubator/qpid/branches/qpid.0-9/python/hello-world Thu Jan 18 09:56:44 2007
@@ -12,3 +12,8 @@
ch.message_consume(queue="test", destination="test")
ch.message_transfer(destination="amq.direct", routing_key="test",
body="hello world")
+msg = client.queue("test").get()
+print msg
+msg.ok()
+import time
+time.sleep(3)
Modified: incubator/qpid/branches/qpid.0-9/python/qpid/client.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/qpid/client.py?view=diff&rev=497515&r1=497514&r2=497515
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/qpid/client.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/qpid/client.py Thu Jan 18 09:56:44
2007
@@ -100,6 +100,9 @@
msg.tune_ok(*msg.frame.args)
self.client.started.set()
+ def message_transfer(self, ch, msg):
+ self.client.queue(msg.destination).put(msg)
+
def basic_deliver(self, ch, msg):
self.client.queue(msg.consumer_tag).put(msg)