Perhaps my search foo is weak, but I have been unable to find any recent
examples of people sending high data data rates over activemq CMS.
http://www.mostly-useless.com/blog/2007/12/27/playing-with-activemq/ -> has
some good data (although from 2007).
Have found some references to CMS 2.2 through the nabble search, but coming
up short at the moment for more recent applications.
I have a pretty simple data driven signal processing system, the processing
is essentially linear:
A-->B-->C-->D
Data rates are highish (~110 Mbit/sec) but nothing astronomical, messages
vary in size, but are generally pretty big (average ~ 200kB, but can be up
to a 1MB).
I moved from a simple sockets to activemq (for portability reasons), I am
using activemq 5.4.2 and CMS 3.4.2 on a pretty peppy RHEL 5.3 64 bit machine
(2.6 Ghz 8 core Xeon with 12GB RAM) (the versions/OS that I use is defined
for me, so updating versions is a bigish deal (I know 5.5 is out)).
I can slow down the processing, and running with anything close to 40Mb/sec
results in the processes consuming a full CPU each (where as running without
activemq each process consumes ~ 20% of a processor) so I am concerned about
activemq's overhead (I suppose it is more likely the overhead of the CMS
bindings).
I don't care about persistence, the data is time sensitive so if the
processing goes down it is of no value for me (video like data). I have
disabled producerFlowControl (unsure if that was a good idea), but as a data
driven processing chain if I am not able to keep up with the data rate then
I am dead in the water. Going to try nio for the transport as I have seen
that mentioned a few places for high throughput usage.
Seem to get into a deadlock state where the process is waiting on a mutex:
__kernel_vsyscall() at 0xffffe410
pthread_cond_timedwait@@GLIBC_2.3.2() at 0xf7b5bd12
decaf::internal::util::concurrent::ConditionImpl::wait() at 0xf77ce9f3
decaf::util::concurrent::Mutex::wait() at 0xf784d767
decaf::util::concurrent::Mutex::wait() at 0xf784d5f2
activemq::transport::failover::FailoverTransport::oneway() at 0xf7540ed7
activemq::transport::correlator::ResponseCorrelator::oneway() at 0xf752a2b8
activemq::core::ActiveMQConnection::oneway() at 0xf746af12
activemq::core::ActiveMQSession::send() at 0xf74bb9e2
activemq::core::ActiveMQProducer::send() at 0xf74aa764
activemq::core::ActiveMQProducer::send() at 0xf74a7dd1
activemq::core::ActiveMQProducer::send() at 0xf74a9585
activeMqProducer::send() at ioActiveMqProducer.cpp:116 0x805a235
io::send() at io.cpp:162 0x8057614
process_data_msg() at main.cpp:814 0x804cf7c
process_message() at main.cpp:891 0x804d272
main() at main.cpp:1,097 0x804e481
I esentially used the cpp samples when I created my io library (just a
simple wrapper).
Consumer code (Sanitised - so may not be syntactically correct) :
void activeMqConsumer::runConsumer()
{
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory( brokerURI );
// Create a Connection
connection = connectionFactory->createConnection();
delete connectionFactory;
ActiveMQConnection* amqConnection =
dynamic_cast<ActiveMQConnection*>( connection );
if( amqConnection != NULL ) {
amqConnection->addTransportListener( this );
}
connection->setExceptionListener(this);
// Create a Session
if( clientAck ) {
session = connection->createSession(
Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession(
Session::AUTO_ACKNOWLEDGE );
}
// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}
// Create a MessageConsumer from the Session to the Topic or
Queue
consumer = session->createConsumer( destination );
consumer->setMessageListener( this );
} catch (CMSException& e) {
e.printStackTrace();
}
}
// Called from the consumer since this class is a registered
MessageListener.
void activeMqConsumer::onMessage( const Message* message )
{
INT32 msgSize;
INT32 bytesRead = 0;
FLOAT64 latency = 0;
TIME currentTime;
INT32 rval;
rval = SDL_SemWait(this->pElementAvailable);
if (rval == -1)
{
printf("(onMessage) error: SDL_SemWait failed\n");
}
try
{
// cast the activeMq "message" into an array of bytes
const cms::BytesMessage *bytesMessage = dynamic_cast<const
cms::BytesMessage *>(message);
if(bytesMessage != NULL)
{
msgSize = bytesMessage->getBodyLength();
bytesRead = bytesMessage->readBytes((unsigned
char*)this->pInputMsg,
msgSize);
totalBytesRecvd += msgSize;
totalMsgsRecvd++;
// the assumption here is that you receive messages
after they have been
sent
// if the times between processing modules are not
synchronized all bets
are off.
getSystemTime(&(currentTime));
latency = (FLOAT64)currentTime -
(FLOAT64)this->pInputMsg->hdr.time;
this->accruedLatency += latency;
if(clientAck == TRUE)
{
message->acknowledge();
}
}
}
catch(cms::CMSException &exception)
{
exception.printStackTrace();
}
if(startTime == 0)
{
getSystemTime(&(startTime));
}
// indicate that there is data available
rval = SDL_SemPost(this->pDataAvailable);
if (rval == -1)
{
printf("(enqueue) error: SDL_SemPost failed\n");
}
}
Producer code:
activeMqProducer::activeMqProducer( const std::string& brokerURI,
const std::string& destURI,
BOOLEAN useTopic)
{
// Create a ConnectionFactory
auto_ptr<ConnectionFactory>
connectionFactory(ConnectionFactory::createCMSConnectionFactory( brokerURI )
);
// Create a Connection
connection = connectionFactory->createConnection();
sessionTransacted = false;
// Create a Session
if( sessionTransacted )
{
session = connection->createSession(
Session::SESSION_TRANSACTED );
}
else
{
session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
}
// Create the destination (Topic or Queue)
if( useTopic )
{
destination = session->createTopic( destURI );
}
else
{
destination = session->createQueue( destURI );
}
// Create a MessageProducer from the Session to the Topic or Queue
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
message = session->createBytesMessage();
}
void activeMqProducer::send(IO_MSG *pOutputMsg)
{
// Set the send time in the IO message
getSystemTime(&(pOutputMsg->hdr.time));
// Convert the message into an activeMQ bytesMessage
message->setBodyBytes( (const unsigned char*)pOutputMsg,
pOutputMsg->hdr.msgSize + sizeof(MSG_HDR));
// Send to broker
producer->send( message );
}
Any suggestions on how to speed up the data rates and stop the producer from
locking up would be greatly appreciated.
V/R
~Joe
--
View this message in context:
http://activemq.2283324.n4.nabble.com/Activemq-CMS-high-data-rate-lockup-tp3670718p3670718.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.