I am new to qpid AMQP broker.
Based on the examples provided with qipd,I have made a simple cpp
program to test the pub/sub throughput on the broker. I am noticing
that the throughput very slow for some reason.
I think this is something wrong in my test code. When I run my test
code, the broker does not seem to be doing much and my app and the
broker are mostly in a stalled/sleep state.
I am not sure what I am doing wrong here. I would expect that when I
run the test, the broker should become busy processing messages as fast
as it can. But it does not seem to be the case here.
qpid broker is running with all defaults ( qpidd -d )
When I run qpid-preftest everything seems to be running smooth and I
get good throughput.
-------------- Code Segment Start ---------------
//pubsub_preftest.cpp
//test sending/receiving binary messages though a QPID broker via
Pub/Sub method.
//
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
#include <stdlib.h>
#include <sys/time.h>
#include <time.h>
#include <memory.h>
// prototypes
void makeRandomBinaryMessage( unsigned char* buffer, int size );
int timeval_subtract (struct timeval *result, struct timeval *x, struct
timeval *y );
int main(int argc, char** argv) {
// get message size from command line..
int message_size = atoi( argc>1 ? argv[1] : "2048" );
// get number of messages to run..
int message_count = atoi( argc>2 ? argv[2] : "10000" );
std::cout << "Parameters: message_size = " << message_size << ";
message_count = " << message_count << " \n";
// check input parameters
if ( ( message_size < 4 ) || ( message_count < 1 ) ) {
std::cout << "invalid input parameters..\n";
return -1;
}
// time stamps
timeval tv_start, tv_stop, tv_delta;
// create message buffer of size "message_size" bytes
unsigned char *txMessage_buffer = (unsigned char *)malloc(
message_size );
// fill buffer with random stuff
makeRandomBinaryMessage( txMessage_buffer, message_size );
// AMQP Connection parameters
const char* url = "amqp:tcp:127.0.0.1:5672";
const char* addr = "pubsub.topic; {create: always, node: {type:
topic}}";
std::string connectionOptions = "";
// Qpid objects
qpid::messaging::Connection connection(url, connectionOptions);
qpid::messaging::Session session;
qpid::messaging::Receiver receiver;
qpid::messaging::Sender sender;
qpid::messaging::Message pubMessage;
qpid::messaging::Message subMessage;
// loop counter
int lc = 0;
double delta_time;
// Start it up..
try {
// make connection to broker
connection.open();
// create a session on the broker
session = connection.createSession();
// create a sender on this session
sender = session.createSender(addr);
// create a receiver on this session
receiver = session.createReceiver(addr);
// start the do-stuff loop
gettimeofday(&tv_start, NULL);
// print new line
//std::cout << "\n";
for (lc=0; lc < message_count; lc++) {
// binary copy counter as first 4 bytes of buffer
memcpy( (void *)txMessage_buffer, (void *)&lc, 4 );
pubMessage.setContent( (const char *)txMessage_buffer,
message_size );
sender.send( pubMessage, true );
// receive message from broker
subMessage = receiver.fetch();
// print status update every 1%
if ( ( lc % ( message_count / 100 ) ) == 0 ) {
// print
std::cout << "\x1B[1K"; // voodoo to clear line on a
vt100 terminal
std::cout << lc << "/" << message_count << " messages
transfered..\n\x1BM";
}
session.acknowledge( subMessage, false );
}
std::cout << "\n";
gettimeofday(&tv_stop, NULL);
std::cout << "finished sending " << message_count << " messages
of size " << message_size << " bytes each.";
timeval_subtract( &tv_delta, &tv_stop, &tv_start);
delta_time = tv_delta.tv_sec + ( tv_delta.tv_usec / 1e6 );
std::cout << "Opeation completed in " << delta_time << "
seconds\n";
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
connection.close();
return -1;
}
connection.close();
return 0;
}
// simple method to fill a buffer with some random stuff
void makeRandomBinaryMessage( unsigned char* buffer, int size ) {
/* random seed, just using time */
srand ( (unsigned int)time(NULL) );
/* now generate random value to fill buffer */
/* note: this data is very random, but random enough for this. */
int i=0;
for (i; i<size; i++) {
buffer[i] = (char)( rand() % 255 );
}
}
// figure out the difference between to timeval structures
int timeval_subtract (struct timeval *result, struct timeval *x, struct
timeval *y )
{
/* Perform the carry for the later subtraction by updating y. */
if (x->tv_usec < y->tv_usec) {
int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
y->tv_usec -= 1000000 * nsec;
y->tv_sec += nsec;
}
if (x->tv_usec - y->tv_usec > 1000000) {
int nsec = (x->tv_usec - y->tv_usec) / 1000000;
y->tv_usec += 1000000 * nsec;
y->tv_sec -= nsec;
}
/* Compute the time remaining to wait.
tv_usec is certainly positive. */
result->tv_sec = x->tv_sec - y->tv_sec;
result->tv_usec = x->tv_usec - y->tv_usec;
/* Return 1 if result is negative. */
return x->tv_sec < y->tv_sec;
}
-------------- Code Segment End ---------------
--
Peter Fetterer (kb3gtn)
* Failure is not an option, but a standard feature to be avoided.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]