Author: aconway
Date: Wed Jan 31 15:28:38 2007
New Revision: 502038
URL: http://svn.apache.org/viewvc?view=rev&rev=502038
Log:
>From Andrew Stitcher <[EMAIL PROTECTED]>
[EMAIL PROTECTED]: andrew | 2007-01-12 00:35:16 +0000
Branch for my work on Qpid.0-9
[EMAIL PROTECTED]: andrew | 2007-01-12 00:59:28 +0000
Added in empty implementation of handler class for protocol Message class
[EMAIL PROTECTED]: andrew | 2007-01-17 01:25:16 +0000
* Added Test for new MessageHandlerImpl (but no actual tests yet)
* Filled in lots of the blanks in the MessageHandlerImpl with code
stolen from the BasicHandlerImpl
[EMAIL PROTECTED]: andrew | 2007-01-17 17:34:13 +0000
Updated to latest upstream changes
[EMAIL PROTECTED]: andrew | 2007-01-19 00:31:59 +0000
Fixed merge errors
[EMAIL PROTECTED]: andrew | 2007-01-19 00:47:29 +0000
Another merge problem fixed
[EMAIL PROTECTED]: andrew | 2007-01-24 11:27:48 +0000
Started work on the Message class handler implementation
[EMAIL PROTECTED]: andrew | 2007-01-30 17:05:05 +0000
Working again after broker Message refactor
[EMAIL PROTECTED]: andrew | 2007-01-30 18:39:18 +0000
Fix for extra parameter to transfer
[EMAIL PROTECTED]: andrew | 2007-01-31 18:29:57 +0000
Checkpoint of work on broker MessageMessage
[EMAIL PROTECTED]: andrew | 2007-01-31 22:02:27 +0000
MessageMessage work now compiles
Added:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
(with props)
Modified:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=502038&r1=502037&r2=502038
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.cpp Wed Jan
31 15:28:38 2007
@@ -35,22 +35,23 @@
BasicMessage::BasicMessage(const ConnectionToken* const _publisher,
const string& _exchange, const string& _routingKey,
- bool _mandatory, bool _immediate) : publisher(_publisher),
- exchange(_exchange),
- routingKey(_routingKey),
- mandatory(_mandatory),
- immediate(_immediate),
- redelivered(false),
- size(0),
- persistenceId(0) {}
+ bool _mandatory, bool _immediate) :
+ Message(_exchange, _routingKey, _mandatory, _immediate),
+ publisher(_publisher),
+ size(0)
+{
+}
BasicMessage::BasicMessage(Buffer& buffer, bool headersOnly, u_int32_t
contentChunkSize) :
- publisher(0), mandatory(false), immediate(false), redelivered(false),
size(0), persistenceId(0){
+ publisher(0), size(0)
+{
decode(buffer, headersOnly, contentChunkSize);
}
-BasicMessage::BasicMessage() : publisher(0), mandatory(false),
immediate(false), redelivered(false), size(0), persistenceId(0){}
+BasicMessage::BasicMessage() : publisher(0), size(0)
+{
+}
BasicMessage::~BasicMessage(){
if (content.get()) content->destroy();
@@ -72,16 +73,13 @@
return header.get() && (header->getContentSize() == contentSize());
}
-void BasicMessage::redeliver(){
- redelivered = true;
-}
-
void BasicMessage::deliver(OutputHandler* out, int channel,
const string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize,
ProtocolVersion* version){
// CCT -- TODO - Update code generator to take pointer/ not instance to
avoid extra contruction
- out->send(new AMQFrame(*version, channel, new BasicDeliverBody(*version,
consumerTag, deliveryTag, redelivered, exchange, routingKey)));
+ out->send(new AMQFrame(*version, channel,
+ new BasicDeliverBody(*version, consumerTag, deliveryTag,
getRedelivered(), getExchange(), getRoutingKey())));
sendContent(out, channel, framesize, version);
}
@@ -90,9 +88,10 @@
u_int32_t messageCount,
u_int64_t deliveryTag,
u_int32_t framesize,
- ProtocolVersion* version){
- // CCT -- TODO - Update code generator to take pointer/ not instance to
avoid extra contruction
- out->send(new AMQFrame(*version, channel, new BasicGetOkBody(*version,
deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ ProtocolVersion* version){
+ // CCT -- TODO - Update code generator to take pointer/ not instance to
avoid extra contruction
+ out->send(new AMQFrame(*version, channel,
+ new BasicGetOkBody(*version, deliveryTag, getRedelivered(),
getExchange(), getRoutingKey(), messageCount)));
sendContent(out, channel, framesize, version);
}
@@ -127,8 +126,12 @@
void BasicMessage::decodeHeader(Buffer& buffer)
{
+ string exchange;
+ string routingKey;
+
buffer.getShortString(exchange);
buffer.getShortString(routingKey);
+ setRouting(exchange, routingKey);
u_int32_t headerSize = buffer.getLong();
AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
@@ -166,8 +169,8 @@
void BasicMessage::encodeHeader(Buffer& buffer)
{
- buffer.putShortString(exchange);
- buffer.putShortString(routingKey);
+ buffer.putShortString(getExchange());
+ buffer.putShortString(getRoutingKey());
buffer.putLong(header->size());
header->encode(buffer);
}
@@ -191,8 +194,8 @@
u_int32_t BasicMessage::encodedHeaderSize()
{
- return exchange.size() + 1
- + routingKey.size() + 1
+ return getExchange().size() + 1
+ + getRoutingKey().size() + 1
+ header->size() + 4;//4 extra bytes for size
}
@@ -204,7 +207,7 @@
void BasicMessage::releaseContent(MessageStore* store)
{
Mutex::ScopedLock locker(contentLock);
- if (!isPersistent() && persistenceId == 0) {
+ if (!isPersistent() && getPersistenceId() == 0) {
store->stage(this);
}
if (!content.get() || content->size() > 0) {
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h?view=diff&rev=502038&r1=502037&r2=502038
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessage.h Wed Jan 31
15:28:38 2007
@@ -48,16 +48,10 @@
*/
class BasicMessage : public Message{
const ConnectionToken* const publisher;
- string exchange;
- string routingKey;
- const bool mandatory;
- const bool immediate;
- bool redelivered;
qpid::framing::AMQHeaderBody::shared_ptr header;
std::auto_ptr<Content> content;
- u_int64_t size;
- u_int64_t persistenceId;
qpid::sys::Mutex contentLock;
+ u_int64_t size;
void sendContent(qpid::framing::OutputHandler* out,
int channel, u_int32_t framesize,
qpid::framing::ProtocolVersion* version);
@@ -88,15 +82,10 @@
u_int64_t deliveryTag,
u_int32_t framesize,
qpid::framing::ProtocolVersion* version);
- void redeliver();
qpid::framing::BasicHeaderProperties* getHeaderProperties();
bool isPersistent();
- const string& getRoutingKey() const { return routingKey; }
- const string& getExchange() const { return exchange; }
u_int64_t contentSize() const { return size; }
- u_int64_t getPersistenceId() const { return persistenceId; }
- void setPersistenceId(u_int64_t _persistenceId) { persistenceId =
_persistenceId; }
void decode(qpid::framing::Buffer& buffer, bool headersOnly =
false, u_int32_t contentChunkSize = 0);
void decodeHeader(qpid::framing::Buffer& buffer);
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h?view=diff&rev=502038&r1=502037&r2=502038
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageBase.h Wed Jan
31 15:28:38 2007
@@ -48,38 +48,73 @@
* TODO; AMS: for the moment this is mostly a placeholder
*/
class Message{
+ std::string exchange;
+ std::string routingKey;
+ const bool mandatory;
+ const bool immediate;
+ u_int64_t persistenceId;
+
+ bool redelivered;
public:
typedef boost::shared_ptr<Message> shared_ptr;
+ Message(const std::string& _exchange, const std::string&
_routingKey,
+ bool _mandatory, bool _immediate) :
+ exchange(_exchange),
+ routingKey(_routingKey),
+ mandatory(_mandatory),
+ immediate(_immediate),
+ persistenceId(0),
+ redelivered(false)
+ {}
+
+ Message() :
+ mandatory(false),
+ immediate(false),
+ persistenceId(0),
+ redelivered(false)
+ {}
+
virtual ~Message() {};
+ // Accessors
+ const std::string& getRoutingKey() const { return routingKey; }
+ const std::string& getExchange() const { return exchange; }
+ u_int64_t getPersistenceId() const { return persistenceId; }
+ bool getRedelivered() const { return redelivered; }
+
+ void setRouting(const std::string& _exchange, const std::string&
_routingKey)
+ { exchange = _exchange; routingKey = _routingKey; }
+ void setPersistenceId(u_int64_t _persistenceId) { persistenceId =
_persistenceId; } // XXXX: Only used in tests?
+ void redeliver() { redelivered = true; }
+
+ /**
+ * Used to deliver the message from the queue
+ */
virtual void deliver(qpid::framing::OutputHandler* out,
int channel,
const std::string& consumerTag,
u_int64_t deliveryTag,
u_int32_t framesize,
qpid::framing::ProtocolVersion* version) = 0;
+ /**
+ * Used to return a message in response to a get from a
queue
+ */
virtual void sendGetOk(qpid::framing::OutputHandler* out,
int channel,
u_int32_t messageCount,
u_int64_t deliveryTag,
u_int32_t framesize,
qpid::framing::ProtocolVersion* version) = 0;
- virtual void redeliver() = 0;
virtual bool isComplete() = 0;
virtual u_int64_t contentSize() const = 0;
virtual qpid::framing::BasicHeaderProperties*
getHeaderProperties() = 0;
virtual bool isPersistent() = 0;
- virtual const std::string& getRoutingKey() const = 0;
virtual const ConnectionToken* const getPublisher() = 0;
- virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used
in tests?
- virtual const std::string& getExchange() const = 0; // XXXX: Only
used in tests?
- virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; //
XXXX: Only used in tests?
-
virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; //
XXXX: Only used in tests?
virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {};
// XXXX: Only used in tests?
@@ -108,12 +143,6 @@
* content size else returns 0.
*/
virtual u_int64_t expectedContentSize() = 0;
- /**
- * Releases the in-memory content data held by this
- * message. Must pass in a store from which the data can
- * be reloaded.
- */
- virtual void releaseContent(MessageStore* /*store*/) {};
// TODO: AMS 29/1/2007 Don't think these are really part of base
class
@@ -125,140 +154,12 @@
virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr
/*header*/) {};
virtual void addContent(qpid::framing::AMQContentBody::shared_ptr
/*data*/) {};
- };
-
- }
-}
-
-
-#endif /*!_broker_BrokerMessage_h*/
-#ifndef _broker_BrokerMessageBase_h
-#define _broker_BrokerMessageBase_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "AMQContentBody.h"
-#include "AMQHeaderBody.h"
-#include "Content.h"
-
-#include <string>
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-
- namespace framing {
- class OutputHandler;
- class ProtocolVersion;
- class BasicHeaderProperties;
- }
-
- namespace broker {
-
- class MessageStore;
- class ConnectionToken;
-
- /**
- * Base class for all types of internal broker messages
- * abstracting away the operations
- * TODO; AMS: for the moment this is mostly a placeholder
- */
- class Message{
-
- public:
- typedef boost::shared_ptr<Message> shared_ptr;
-
- virtual ~Message() {};
-
- virtual void deliver(qpid::framing::OutputHandler* out,
- int channel,
- const std::string& consumerTag,
- u_int64_t deliveryTag,
- u_int32_t framesize,
-
qpid::framing::ProtocolVersion* version) = 0;
- virtual void sendGetOk(qpid::framing::OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize,
-
qpid::framing::ProtocolVersion* version) = 0;
- virtual void redeliver() = 0;
-
- virtual bool isComplete() = 0;
-
- virtual u_int64_t contentSize() const = 0;
- virtual qpid::framing::BasicHeaderProperties*
getHeaderProperties() = 0;
- virtual bool isPersistent() = 0;
- virtual const std::string& getRoutingKey() const = 0;
- virtual const ConnectionToken* const getPublisher() = 0;
- virtual u_int64_t getPersistenceId() const = 0; // XXXX: Only used
in tests?
- virtual const std::string& getExchange() const = 0; // XXXX: Only
used in tests?
-
- virtual void setPersistenceId(u_int64_t /*persistenceId*/) {}; //
XXXX: Only used in tests?
-
- virtual void encode(qpid::framing::Buffer& /*buffer*/) {}; //
XXXX: Only used in tests?
- virtual void encodeHeader(qpid::framing::Buffer& /*buffer*/) {};
// XXXX: Only used in tests?
-
- /**
- * @returns the size of the buffer needed to encode this
- * message in its entirety
- *
- * XXXX: Only used in tests?
- */
- virtual u_int32_t encodedSize() = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * 'header' of this message (not just the header frame,
- * but other meta data e.g.routing key and exchange)
- *
- * XXXX: Only used in tests?
- */
- virtual u_int32_t encodedHeaderSize() = 0;
- /**
- * @returns the size of the buffer needed to encode the
- * (possibly partial) content held by this message
- */
- virtual u_int32_t encodedContentSize() = 0;
- /**
- * If headers have been received, returns the expected
- * content size else returns 0.
- */
- virtual u_int64_t expectedContentSize() = 0;
/**
* Releases the in-memory content data held by this
* message. Must pass in a store from which the data can
* be reloaded.
*/
virtual void releaseContent(MessageStore* /*store*/) {};
-
- // TODO: AMS 29/1/2007 Don't think these are really part of base
class
-
- /**
- * Sets the 'content' implementation of this message (the
- * message controls the lifecycle of the content instance
- * it uses).
- */
- virtual void setContent(std::auto_ptr<Content>& /*content*/) {};
- virtual void setHeader(qpid::framing::AMQHeaderBody::shared_ptr
/*header*/) {};
- virtual void addContent(qpid::framing::AMQContentBody::shared_ptr
/*data*/) {};
};
}
Added: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp?view=auto&rev=502038
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
(added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
Wed Jan 31 15:28:38 2007
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "BrokerMessageMessage.h"
+
+using namespace qpid::broker;
+
+MessageMessage::MessageMessage(const qpid::framing::AMQMethodBody&
_methodBody,
+ const std::string& _exchange, const std::string& _routingKey,
+ bool _mandatory, bool _immediate) :
+ Message(_exchange, _routingKey, _mandatory, _immediate),
+ methodBody(_methodBody)
+{
+}
+
+void MessageMessage::deliver(qpid::framing::OutputHandler* /*out*/,
+ int /*channel*/,
+ const std::string& /*consumerTag*/,
+ u_int64_t /*deliveryTag*/,
+ u_int32_t /*framesize*/,
+ qpid::framing::ProtocolVersion* /*version*/)
+{
+}
+
+void MessageMessage::sendGetOk(qpid::framing::OutputHandler* /*out*/,
+ int /*channel*/,
+ u_int32_t /*messageCount*/,
+ u_int64_t /*deliveryTag*/,
+ u_int32_t /*framesize*/,
+ qpid::framing::ProtocolVersion* /*version*/)
+{
+}
+
+bool MessageMessage::isComplete()
+{
+ return true;
+}
+
+u_int64_t MessageMessage::contentSize() const
+{
+ return 0;
+}
+
+qpid::framing::BasicHeaderProperties* MessageMessage::getHeaderProperties()
+{
+ return 0;
+}
+bool MessageMessage::isPersistent()
+{
+ return false;
+}
+
+const ConnectionToken* const MessageMessage::getPublisher()
+{
+ return 0;
+}
+
+u_int32_t MessageMessage::encodedSize()
+{
+ return 0;
+}
+
+u_int32_t MessageMessage::encodedHeaderSize()
+{
+ return 0;
+}
+
+u_int32_t MessageMessage::encodedContentSize()
+{
+ return 0;
+}
+
+u_int64_t MessageMessage::expectedContentSize()
+{
+ return 0;
+}
+
Propchange:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h?view=diff&rev=502038&r1=502037&r2=502038
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/BrokerMessageMessage.h Wed
Jan 31 15:28:38 2007
@@ -25,12 +25,21 @@
#include "BrokerMessageBase.h"
namespace qpid {
+ namespace framing {
+ class AMQMethodBody;
+ }
+
namespace broker {
class MessageMessage: public Message{
+ const qpid::framing::AMQMethodBody& methodBody;
public:
- ~MessageMessage();
+ MessageMessage(const qpid::framing::AMQMethodBody& methodBody,
+ const std::string& exchange, const std::string& routingKey,
+ bool mandatory, bool immediate);
+ // Default destructor okay
+
void deliver(qpid::framing::OutputHandler* out,
int channel,
const std::string& consumerTag,
@@ -43,88 +52,17 @@
u_int64_t deliveryTag,
u_int32_t framesize,
qpid::framing::ProtocolVersion* version);
- void redeliver();
- void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
- void addContent(qpid::framing::AMQContentBody::shared_ptr data);
bool isComplete();
- void setContent(std::auto_ptr<Content>& content);
u_int64_t contentSize() const;
qpid::framing::BasicHeaderProperties* getHeaderProperties();
bool isPersistent();
- const std::string& getRoutingKey() const;
const ConnectionToken* const getPublisher();
+ u_int32_t encodedSize();
+ u_int32_t encodedHeaderSize();
u_int32_t encodedContentSize();
u_int64_t expectedContentSize();
- void releaseContent(MessageStore* store);
- };
-
- }
-}
-
-
-#endif /*!_broker_BrokerMessage_h*/
-#ifndef _broker_BrokerMessageMessage_h
-#define _broker_BrokerMessageMessage_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "BrokerMessageBase.h"
-
-namespace qpid {
- namespace broker {
- class MessageMessage: public Message{
-
- public:
- ~MessageMessage();
-
- void deliver(qpid::framing::OutputHandler* out,
- int channel,
- const std::string& consumerTag,
- u_int64_t deliveryTag,
- u_int32_t framesize,
-
qpid::framing::ProtocolVersion* version);
- void sendGetOk(qpid::framing::OutputHandler* out,
- int channel,
- u_int32_t messageCount,
- u_int64_t deliveryTag,
- u_int32_t framesize,
-
qpid::framing::ProtocolVersion* version);
- void redeliver();
- void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
- void addContent(qpid::framing::AMQContentBody::shared_ptr data);
- bool isComplete();
- void setContent(std::auto_ptr<Content>& content);
-
- u_int64_t contentSize() const;
- qpid::framing::BasicHeaderProperties* getHeaderProperties();
- bool isPersistent();
- const std::string& getRoutingKey() const;
- const ConnectionToken* const getPublisher();
-
- u_int32_t encodedContentSize();
- u_int64_t expectedContentSize();
- void releaseContent(MessageStore* store);
};
}
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am?view=diff&rev=502038&r1=502037&r2=502038
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/Makefile.am Wed Jan 31
15:28:38 2007
@@ -23,6 +23,8 @@
BrokerExchange.h \
BrokerMessage.cpp \
BrokerMessage.h \
+ BrokerMessageMessage.cpp \
+ BrokerMessageMessage.h \
BrokerQueue.cpp \
BrokerQueue.h \
Configuration.cpp \
Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp?view=diff&rev=502038&r1=502037&r2=502038
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp
(original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/MessageHandlerImpl.cpp Wed
Jan 31 15:28:38 2007
@@ -189,14 +189,14 @@
u_int16_t /*ticket*/,
const string& /*destination*/,
bool /*redelivered*/,
- bool /*immediate*/,
+ bool immediate,
u_int64_t /*ttl*/,
u_int8_t /*priority*/,
u_int64_t /*timestamp*/,
u_int8_t /*deliveryMode*/,
u_int64_t /*expiration*/,
const string& exchangeName,
- const string& /*routingKey*/,
+ const string& routingKey,
const string& /*messageId*/,
const string& /*correlationId*/,
const string& /*replyTo*/,
@@ -208,7 +208,7 @@
const string& /*securityToken*/,
const qpid::framing::FieldTable&
/*applicationHeaders*/,
qpid::framing::Content body,
- bool /*mandatory*/ )
+ bool mandatory )
{
//assert(0); // FIXME astitcher 2007-01-11: 0-9 feature
@@ -216,9 +216,9 @@
broker.getExchanges().getDefault() :
broker.getExchanges().get(exchangeName);
if(exchange){
if (body.isInline()) {
-// MessageMessage* msg =
-// new MessageMessage(&connection, exchangeName,
routingKey, immediate);
-// channel.handlePublish(msg, exchange);
+ MessageMessage* msg =
+ new MessageMessage(*(context.methodBody),
exchangeName, routingKey, mandatory, immediate);
+ channel.handlePublish(msg, exchange);
connection.client->getMessageHandler()->ok(context);
} else {