Author: astitcher
Date: Tue Feb 6 07:53:33 2007
New Revision: 504182
URL: http://svn.apache.org/viewvc?view=rev&rev=504182
Log:
Inline transferred messages delivered
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerAdapter.cpp Tue Feb
6 07:53:33 2007
@@ -334,12 +334,17 @@
//no specific action required, generic response handling should be
sufficient
}
+
+//
+// Message class method handlers
+//
void
BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext&
context)
{
connection.client->getChannel().ok(context);
connection.client->getChannel().pong(context);
}
+
void
BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext&
context)
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.cpp Tue Feb
6 07:53:33 2007
@@ -187,6 +187,17 @@
if(blocked) queue->dispatch();
}
+void Channel::handleInlineTransfer(Message::shared_ptr& msg,
Exchange::shared_ptr& exch){
+ if(transactional){
+ TxPublish* deliverable = new TxPublish(msg);
+ exch->route(*deliverable, msg->getRoutingKey(),
&(msg->getHeaderProperties()->getHeaders()));
+ txBuffer.enlist(new DeletingTxOp(deliverable));
+ }else{
+ DeliverableMessage deliverable(msg);
+ exch->route(deliverable, msg->getRoutingKey(),
&(msg->getHeaderProperties()->getHeaders()));
+ }
+}
+
// FIXME aconway 2007-02-05: Drop exchange member, calculate from
// message in ::complete().
void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerChannel.h Tue Feb 6
07:53:33 2007
@@ -96,7 +96,9 @@
boost::scoped_ptr<BrokerAdapter> adapter;
- virtual void complete(Message::shared_ptr msg);
+ // completion handler for MessageBuilder
+ void complete(Message::shared_ptr msg);
+
void deliver(Message::shared_ptr& msg, const string& tag,
Queue::shared_ptr& queue, bool ackExpected);
void cancel(consumer_iterator consumer);
bool checkPrefetch(Message::shared_ptr& msg);
@@ -110,7 +112,9 @@
~Channel();
+ // For ChannelAdapter
bool isOpen() const { return opened; }
+
void open() { opened = true; }
void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
Queue::shared_ptr getDefaultQueue() const { return defaultQueue; }
@@ -134,6 +138,10 @@
void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>);
void handleContent(boost::shared_ptr<framing::AMQContentBody>);
void handleHeartbeat(boost::shared_ptr<framing::AMQHeartbeatBody>);
+
+ void handleInlineTransfer(Message::shared_ptr& msg, Exchange::shared_ptr&
exchange);
+
+ // For ChannelAdapter
void handleMethodInContext(
boost::shared_ptr<framing::AMQMethodBody> method,
const framing::MethodContext& context);
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
Tue Feb 6 07:53:33 2007
@@ -18,15 +18,27 @@
* under the License.
*
*/
-#include <iostream>
#include "BrokerMessageMessage.h"
+#include "ChannelAdapter.h"
#include "MessageTransferBody.h"
#include "MessageAppendBody.h"
#include "Reference.h"
+#include <iostream>
+
using namespace std;
using namespace qpid::broker;
+using namespace qpid::framing;
+MessageMessage::MessageMessage(
+ const boost::shared_ptr<MessageTransferBody> _methodBody,
+ const std::string& _exchange, const std::string& _routingKey,
+ bool _mandatory, bool _immediate) :
+ Message(_exchange, _routingKey, _mandatory, _immediate, _methodBody),
+ methodBody(_methodBody)
+{
+}
+
MessageMessage::MessageMessage(TransferPtr transfer_)
: Message(transfer_->getExchange(), transfer_->getRoutingKey(),
transfer_->getMandatory(), transfer_->getImmediate(),
@@ -43,14 +55,36 @@
{}
void MessageMessage::deliver(
- framing::ChannelAdapter& /*channel*/,
- const std::string& /*consumerTag*/,
+ framing::ChannelAdapter& channel,
+ const std::string& consumerTag,
u_int64_t /*deliveryTag*/,
u_int32_t /*framesize*/)
{
- // FIXME aconway 2007-02-05:
- cout << "MessageMessage::deliver" << *transfer << " + " << appends.size()
- << " appends." << endl;
+ channel.send(
+ new MessageTransferBody(channel.getVersion(),
+ methodBody->getTicket(),
+ consumerTag,
+ getRedelivered(),
+ methodBody->getImmediate(),
+ methodBody->getTtl(),
+ methodBody->getPriority(),
+ methodBody->getTimestamp(),
+ methodBody->getDeliveryMode(),
+ methodBody->getExpiration(),
+ getExchange(),
+ getRoutingKey(),
+ methodBody->getMessageId(),
+ methodBody->getCorrelationId(),
+ methodBody->getReplyTo(),
+ methodBody->getContentType(),
+ methodBody->getContentEncoding(),
+ methodBody->getUserId(),
+ methodBody->getAppId(),
+ methodBody->getTransactionId(),
+ methodBody->getSecurityToken(),
+ methodBody->getApplicationHeaders(),
+ methodBody->getBody(),
+ methodBody->getMandatory()));
}
void MessageMessage::sendGetOk(
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Tue
Feb 6 07:53:33 2007
@@ -21,10 +21,12 @@
* under the License.
*
*/
-#include <vector>
#include "BrokerMessageBase.h"
+#include "MessageTransferBody.h"
#include "Reference.h"
+#include <vector>
+
namespace qpid {
namespace framing {
@@ -36,11 +38,17 @@
class Reference;
class MessageMessage: public Message{
+ const boost::shared_ptr<framing::MessageTransferBody> methodBody;
+
public:
typedef Reference::TransferPtr TransferPtr;
typedef Reference::AppendPtr AppendPtr;
- typedef Reference::Appends Appends;
+ typedef Reference::Appends Appends;
+ MessageMessage(
+ const boost::shared_ptr<framing::MessageTransferBody> methodBody,
+ const std::string& exchange, const std::string& routingKey,
+ bool mandatory, bool immediate);
MessageMessage(TransferPtr transfer);
MessageMessage(TransferPtr transfer, const Reference&);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Tue
Feb 6 07:53:33 2007
@@ -80,8 +80,6 @@
bool exclusive,
const qpid::framing::FieldTable& filter )
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId());
if(!destination.empty() && channel.exists(destination)){
throw ConnectionException(530, "Consumer tags must be unique");
@@ -139,7 +137,7 @@
void
MessageHandlerImpl::ok( const MethodContext& )
{
- // TODO aconway 2007-02-05: For HA, we can drop acked messages here.
+ // TODO: Need to ack the transfers acknowledged so far for flow control
purp oses
}
void
@@ -156,8 +154,6 @@
u_int16_t prefetchCount,
bool /*global*/ )
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
-
//TODO: handle global
channel.setPrefetchSize(prefetchSize);
channel.setPrefetchCount(prefetchCount);
@@ -196,14 +192,14 @@
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
- bool /* immediate */,
+ bool immediate,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
const string& exchangeName,
- const string& /* routingKey */,
+ const string& routingKey,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -215,22 +211,24 @@
const string& /*securityToken*/,
const qpid::framing::FieldTable&
/*applicationHeaders*/,
qpid::framing::Content body,
- bool /* mandatory */ )
+ bool mandatory)
{
- //assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
- MessageTransferBody::shared_ptr transfer =
- boost::shared_polymorphic_downcast<MessageTransferBody>(
- context.methodBody);
- // Verify the exchange exists, will throw if not.
- broker.getExchanges().get(exchangeName);
- if (body.isInline()) {
- MessageMessage* msg = new MessageMessage(transfer);
- // FIXME aconway 2007-02-05: Remove exchange parameter.
- // use shared_ptr for message.
- channel.handlePublish(msg, Exchange::shared_ptr());
- sendOk(context);
- } else {
- references.get(body.getValue()).transfer(transfer);
+ Exchange::shared_ptr exchange = exchangeName.empty() ?
+ broker.getExchanges().getDefault() :
broker.getExchanges().get(exchangeName);
+ boost::shared_ptr<MessageTransferBody>
transfer(boost::dynamic_pointer_cast<MessageTransferBody>(context.methodBody));
+ if(exchange){
+ if (body.isInline()) {
+ Message::shared_ptr msg(new MessageMessage(transfer, exchangeName,
+ routingKey, mandatory, immediate));
+
+ channel.handleInlineTransfer(msg, exchange);
+
+ connection.client->getMessageHandler()->ok(context);
+ } else {
+ references.get(body.getValue()).transfer(transfer);
+ }
+ }else{
+ throw ChannelException(404, "Exchange not found '" + exchangeName +
"'");
}
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am?view=diff&rev=504182&r1=504181&r2=504182
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/Makefile.am Tue Feb 6 07:53:33
2007
@@ -24,7 +24,6 @@
InMemoryContentTest \
LazyLoadedContentTest \
MessageBuilderTest \
- MessageHandlerTest \
MessageTest \
ReferenceTest \
QueueRegistryTest \