Author: aconway
Date: Fri Oct 17 09:45:24 2008
New Revision: 705668
URL: http://svn.apache.org/viewvc?rev=705668&view=rev
Log:
QPID-1367 Mick Goulish: improvements to client-side failover.
Modified:
incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp
incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
incubator/qpid/trunk/qpid/cpp/src/tests/ (props changed)
incubator/qpid/trunk/qpid/cpp/src/tests/ssl_test
Modified: incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/direct/direct_producer.cpp Fri Oct
17 09:45:24 2008
@@ -64,6 +64,7 @@
int main(int argc, char** argv) {
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
+ int count = argc>3 ? atoi(argv[3]) : 10;
Connection connection;
Message message;
try {
@@ -81,14 +82,15 @@
// Now send some messages ...
- for (int i=0; i<10; i++) {
+ for (int i=0; i<count; i++) {
stringstream message_data;
message_data << "Message " << i;
message.setData(message_data.str());
// Asynchronous transfer sends messages as quickly as
// possible without waiting for confirmation.
- async(session).messageTransfer(arg::content=message,
arg::destination="amq.direct");
+ // async(session).messageTransfer(arg::content=message,
arg::destination="amq.direct");
+ session.messageTransfer(arg::content=message,
arg::destination="amq.direct");
}
// And send a final message to indicate termination.
Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/direct_producer.cpp Fri Oct
17 09:45:24 2008
@@ -36,12 +36,13 @@
int
main ( int argc, char ** argv)
{
+
const char* host = argc>1 ? argv[1] : "127.0.0.1";
int port = argc>2 ? atoi(argv[2]) : 5672;
int count = argc>3 ? atoi(argv[3]) : 30;
- int delayMs = argc>4 ? atoi(argv[4]) : 1000;
string program_name = "PRODUCER";
+
try {
FailoverConnection connection;
FailoverSession * session;
@@ -49,14 +50,23 @@
connection.open ( host, port );
session = connection.newSession();
+ bool report = true;
int sent = 0;
while ( sent < count ) {
+
message.getDeliveryProperties().setRoutingKey("routing_key");
- std::cout << "sending message "
- << sent
- << " of "
- << count
- << ".\n";
+
+
+ if ( count > 1000 )
+ report = !(sent % 1000);
+
+ if ( report )
+ {
+ std::cout << "sending message "
+ << sent
+ << ".\n";
+ }
+
stringstream message_data;
message_data << sent;
message.setData(message_data.str());
@@ -70,12 +80,12 @@
0,
message
);
- usleep ( 1000*delayMs );
+
++ sent;
}
message.setData ( "That's all, folks!" );
- /* MICK FIXME
+ /* FIXME mgoulish 16 Oct 08
session.messageTransfer ( arg::content=message,
arg::destination="amq.direct"
);
@@ -88,10 +98,17 @@
session->sync();
connection.close();
+ std::cout << program_name
+ << " sent "
+ << sent
+ << " messages.\n";
+
std::cout << program_name << ": " << " completed without error." <<
std::endl;
return 0;
} catch(const std::exception& error) {
std::cout << program_name << ": " << error.what() << std::endl;
+ std::cout << program_name << "Exiting.\n";
+ return 1;
}
return 1;
}
Modified: incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/failover/listener.cpp Fri Oct 17
09:45:24 2008
@@ -34,150 +34,26 @@
using namespace std;
-struct Recorder
-{
- unsigned int max_messages;
- unsigned int * messages_received;
-
- Recorder ( )
- {
- max_messages = 1000;
- messages_received = new unsigned int [ max_messages ];
- memset ( messages_received, 0, max_messages * sizeof(int) );
- }
-
-
- void
- received ( int i )
- {
- messages_received[i] ++;
- }
-
-
-
- void
- report ( )
- {
- int i;
-
- int last_received_message = 0;
-
- vector<unsigned int> missed_messages,
- multiple_messages;
-
- /*----------------------------------------------------
- Collect indices of missed and multiple messages.
- ----------------------------------------------------*/
- bool seen_first_message = false;
- for ( i = max_messages - 1; i >= 0; -- i )
- {
- if ( ! seen_first_message )
- {
- if ( messages_received [i] > 0 )
- {
- seen_first_message = true;
- last_received_message = i;
- }
- }
- else
- {
- if ( messages_received [i] == 0 )
- missed_messages.push_back ( i );
- else
- if ( messages_received [i] > 1 )
- {
- multiple_messages.push_back ( i );
- }
- }
- }
-
- /*--------------------------------------------
- Report missed messages.
- --------------------------------------------*/
- char const * verb = ( missed_messages.size() == 1 )
- ? " was "
- : " were ";
-
- char const * plural = ( missed_messages.size() == 1 )
- ? "."
- : "s.";
-
- std::cerr << "Listener::shutdown: There"
- << verb
- << missed_messages.size()
- << " missed message"
- << plural
- << endl;
-
- for ( i = 0; i < int(missed_messages.size()); ++ i )
- {
- std::cerr << " " << i << " was missed.\n";
- }
-
-
- /*--------------------------------------------
- Report multiple messages.
- --------------------------------------------*/
- verb = ( multiple_messages.size() == 1 )
- ? " was "
- : " were ";
-
- plural = ( multiple_messages.size() == 1 )
- ? "."
- : "s.";
-
- std::cerr << "Listener::shutdown: There"
- << verb
- << multiple_messages.size()
- << " multiple message"
- << plural
- << endl;
-
- for ( i = 0; i < int(multiple_messages.size()); ++ i )
- {
- std::cerr << " "
- << multiple_messages[i]
- << " was received "
- << messages_received [ multiple_messages[i] ]
- << " times.\n";
- }
-
- /*
- for ( i = 0; i < last_received_message; ++ i )
- {
- std::cerr << "Message " << i << ": " << messages_received[i] <<
std::endl;
- }
- */
- }
-
-};
-
-
-
-
struct Listener : public MessageListener
{
FailoverSubscriptionManager & subscriptionManager;
- Recorder & recorder;
-
- Listener ( FailoverSubscriptionManager& subs,
- Recorder & recorder
- );
+ Listener ( FailoverSubscriptionManager& subs );
- void shutdown() { recorder.report(); }
- void parse_message ( std::string const & msg );
+ void shutdown() { subscriptionManager.stop(); }
virtual void received ( Message & message );
+
+ int count;
};
-Listener::Listener ( FailoverSubscriptionManager & s, Recorder & r ) :
+Listener::Listener ( FailoverSubscriptionManager & s ) :
subscriptionManager(s),
- recorder(r)
+ count(0)
{
}
@@ -188,18 +64,19 @@
void
Listener::received ( Message & message )
{
- std::cerr << "Listener received: " << message.getData() << std::endl;
+ if(! (count%1000))
+ std::cerr << "\t\tListener received: " << message.getData() << std::endl;
+
+ ++ count;
+
if (message.getData() == "That's all, folks!")
{
std::cout << "Shutting down listener for " << message.getDestination()
<< std::endl;
- subscriptionManager.cancel(message.getDestination());
- shutdown();
- }
- else
- {
- parse_message ( message.getData() );
+ std::cout << "Listener received " << count << " messages.\n";
+ subscriptionManager.cancel(message.getDestination());
+ shutdown ( );
}
}
@@ -207,21 +84,6 @@
-void
-Listener::parse_message ( const std::string & msg )
-{
- int msg_number;
- if(1 != sscanf ( msg.c_str(), "%d", & msg_number ) )
- {
- std::cerr << "Listener::parse_message error: Can't read message number
from this message: |" << msg_number << "|\n";
- return;
- }
- recorder.received ( msg_number );
-}
-
-
-
-
int
@@ -235,17 +97,12 @@
FailoverConnection connection;
FailoverSession * session;
- Recorder recorder;
-
- connection.name = program_name;
connection.open ( host, port );
session = connection.newSession();
- session->name = program_name;
FailoverSubscriptionManager subscriptions ( session );
- subscriptions.name = program_name;
- Listener listener ( subscriptions, recorder );
+ Listener listener ( subscriptions );
subscriptions.subscribe ( listener, "message_queue" );
subscriptions.run ( );
@@ -256,7 +113,8 @@
} catch(const std::exception& error) {
std::cout << program_name << ": " << error.what() << std::endl;
}
- return 1;
+
+ return 0;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Fri Oct 17
09:45:24 2008
@@ -151,8 +151,11 @@
static const std::string CONN_CLOSED("Connection closed by broker");
void ConnectionImpl::shutdown() {
-
Mutex::ScopedLock l(lock);
+
+ if ( failureCallback )
+ failureCallback();
+
if (handler.isClosed()) return;
// FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have
@@ -161,9 +164,6 @@
closeInternal(boost::bind(&SessionImpl::connectionBroke, _1,
CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED,
CONN_CLOSED));
handler.fail(CONN_CLOSED);
-
- if ( failureCallback )
- failureCallback();
}
void ConnectionImpl::erase(uint16_t ch) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Fri Oct 17
09:45:24 2008
@@ -42,6 +42,7 @@
void Subscriber::received(Message& msg)
{
+
if (listener) {
listener->received(msg);
autoAck.ack(msg, session);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp Fri
Oct 17 09:45:24 2008
@@ -36,7 +36,6 @@
FailoverConnection::FailoverConnection ( ) :
- name(),
failoverCompleteTime(0)
{
connection.registerFailureCallback
@@ -59,7 +58,6 @@
settings.host = host;
settings.port = port;
settings.username = uid;
- settings.username = uid;
settings.password = pwd;
settings.virtualhost = virtualhost;
settings.maxFrameSize = maxFrameSize;
@@ -124,9 +122,19 @@
void
FailoverConnection::failover ( )
{
+ std::vector<FailoverSession *>::iterator sessions_iterator;
+
+ for ( sessions_iterator = sessions.begin();
+ sessions_iterator != sessions.end();
+ ++ sessions_iterator )
+ {
+ FailoverSession * fs = * sessions_iterator;
+ fs->failover_in_progress = true;
+ }
+
std::vector<Url> knownBrokers = connection.getKnownBrokers();
if (knownBrokers.empty())
- throw Exception(QPID_MSG("FailoverConnection::failover " << name << "
no known brokers."));
+ throw Exception(QPID_MSG("FailoverConnection::failover no known
brokers."));
Connection newConnection;
for (std::vector<Url>::iterator i = knownBrokers.begin(); i !=
knownBrokers.end(); ++i) {
@@ -148,7 +156,6 @@
*/
// FIXME aconway 2008-10-10: thread unsafe, possible race with concurrent
newSession
- std::vector<FailoverSession *>::iterator sessions_iterator;
for ( sessions_iterator = sessions.begin();
sessions_iterator < sessions.end();
++ sessions_iterator
@@ -173,6 +180,15 @@
FailoverSession * fs = * sessions_iterator;
fs->failover ( );
}
+
+ for ( sessions_iterator = sessions.begin();
+ sessions_iterator < sessions.end();
+ ++ sessions_iterator
+ )
+ {
+ FailoverSession * fs = * sessions_iterator;
+ fs->failover_in_progress = false;
+ }
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.h Fri Oct
17 09:45:24 2008
@@ -71,10 +71,6 @@
void registerFailureCallback ( boost::function<void ()> fn );
- // If you have more than 1 connection and you want to give them
- // separate names for debugging...
- std::string name;
-
void failover ( );
struct timeval * failoverCompleteTime;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp Fri Oct
17 09:45:24 2008
@@ -38,7 +38,7 @@
namespace client {
FailoverSession::FailoverSession ( ) :
- name("no_name")
+ failover_in_progress(false)
{
// The session is created by FailoverConnection::newSession
failoverSubscriptionManager = 0;
@@ -170,11 +170,26 @@
)
{
- session.messageTransfer ( destination,
- acceptMode,
- acquireMode,
- content
- );
+ while ( 1 )
+ {
+ try
+ {
+ session.messageTransfer ( destination,
+ acceptMode,
+ acquireMode,
+ content
+ );
+ break;
+ }
+ catch ( ... )
+ {
+ // Take special action only if there is a failover in progress.
+ if ( ! failover_in_progress )
+ break;
+
+ usleep ( 1000 );
+ }
+ }
}
@@ -583,7 +598,6 @@
if ( failoverSubscriptionManager )
{
- //
failoverSubscriptionManager->prepareForFailover ( newSession );
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.h Fri Oct 17
09:45:24 2008
@@ -59,8 +59,6 @@
FailoverSession ( );
~FailoverSession ( );
- std::string name;
-
framing::FrameSet::shared_ptr get();
SessionId getId() const;
@@ -82,6 +80,8 @@
void sendCompletion ( );
+ bool failover_in_progress;
+
// Wrapped functions from Session ----------------------------
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
(original)
+++
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
Fri Oct 17 09:45:24 2008
@@ -33,7 +33,6 @@
FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession *
fs) :
- name("no_name"),
newSessionIsValid(false),
no_failover(false)
{
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
Fri Oct 17 09:45:24 2008
@@ -114,9 +114,6 @@
void prepareForFailover ( Session newSession );
void failover ( );
- std::string name;
-
-
private:
sys::Monitor lock;
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Oct 17 09:45:24 2008
@@ -23,3 +23,5 @@
.valgrindrc
cluster_test
echotest
+cert.password
+test_cert_db
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ssl_test
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ssl_test?rev=705668&r1=705667&r2=705668&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ssl_test (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ssl_test Fri Oct 17 09:45:24 2008
@@ -1,8 +1,8 @@
#!/bin/sh
# Run a simple test over SSL
-MY_DIR=$(dirname $(which $0))
-CERT_DIR=${MY_DIR}/test_cert_db
-CERT_PW_FILE=${MY_DIR}/cert.password
+
+CERT_DIR=`pwd`/test_cert_db
+CERT_PW_FILE=`pwd`/cert.password
HOSTNAME=`hostname`
COUNT=10000
@@ -18,13 +18,13 @@
}
start_broker() {
- ${MY_DIR}/../qpidd --daemon --transport ssl --port 0 --ssl-port 0
--no-data-dir --no-module-dir --auth no\
- --load-module ${MY_DIR}/../.libs/ssl.so --ssl-cert-db $CERT_DIR
--ssl-cert-password-file $CERT_PW_FILE > qpidd.port
+ ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir
--no-module-dir --auth no\
+ --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR
--ssl-cert-password-file $CERT_PW_FILE > qpidd.port
PORT=`cat qpidd.port`
}
stop_broker() {
- ${MY_DIR}/../qpidd -q --port $PORT
+ ../qpidd -q --port $PORT
}
if [[ !(-e ${CERT_PW_FILE}) ]] ; then
@@ -36,8 +36,8 @@
start_broker || error "Could not start broker"
echo "Running SSL test on port $PORT"
-export QPID_LOAD_MODULE=${MY_DIR}/../.libs/sslconnector.so
+export QPID_LOAD_MODULE=../.libs/sslconnector.so
export QPID_SSL_CERT_DB=${CERT_DIR}
export QPID_SSL_CERT_PASSWORD_FILE=${CERT_PW_FILE}
-${MY_DIR}/perftest --count ${COUNT} --port ${PORT} -P ssl -b $HOSTNAME
--summary
+./perftest --count ${COUNT} --port ${PORT} -P ssl -b $HOSTNAME --summary