Hi Tim,
Please find attached a sample application testing transaction
throughput for one publishing and one consuming connections.
By running it on my machine I see a difference in performance of ~15%
between 0.11.0 and 0.20.0.
Our performance test suite uses 10 publishing and 10 consuming
connections. The difference in performance is a bit bigger with it.
Btw, you can run the suite from qpid-java project using
mvn integration-test -f ./perftests/pom.xml -DskipTests=true \
-Dperftests=qpid-jms-client -Dqpid.disttest.duration=20000 \
-Dperftests.hillclimb=true -Dperftests.hillclimb.minimum_delta=100
-Dperftests.hillclimb.max_runs=2 \
-Dperftests.test-config=./perftests/etc/testdefs \
-Dqpid-jms-client-version=0.11.0 \
/// <--- parameters below are default values and can be skipped
-Dperftests.messaging-hostport-plain=localhost:5672 \
-Dperftests.messaging-hostport-tls=localhost:5671 \
-Dperftests.messaging-user=guest \
-Dperftests.messaging-password=guest \
-Dperftests.broker-virtualhostnode=default \
-Dperftests.broker-virtualhost=default \
-Dperftests.manangement-url=http://localhost:8080 \
-Dperftests.manangement-user=guest \
-Dperftests.manangement-password=guest \
-Dperftests.broker-virtualhostnode=default
The basic authentication needs to be enabled on http port in order to
run the suite. The suite contains 2 transaction and 2 auto ack tests.
Only transactions tests are affected. The difference in performance
for Auto ack is unnoticeable.
Potentially you can comment auto-ack tests in
./perftests/etc/testdefs/defaultTests.js and run only transaction
tests.
Kind Regards,
Alex
On 28 February 2017 at 17:04, Timothy Bish <[email protected]> wrote:
> On 02/28/2017 08:10 AM, Oleksandr Rudyy wrote:
>>
>> Hi all,
>>
>> After upgrading Qpid JMS Client from version 0.11.0 to version 0.20.0
>> our performance test results dropped approximately on 20-25% in the
>> performance tests testing transaction performance with trunk version
>> of Qpid Java Broker.
>>
>> Changing client back to v0.11.0 and re-running the tests yields me the
>> same results as before the upgrade.
>>
>> What changes on the client could be the reason for a performance drop?
>
>
> There was a great deal of change in the client to implement the extra bits
> needed to meet the JMS 2.0 specification requirements along with other
> changes to handle of the more interesting edge cases around JMS usages so
> it'd be hard to just pull out one change as the culprit without more
> research. I'm not that surprised that there were some impacts.
>
> What would be helpful would be to pull out one affected test case where you
> see this drop in performance into a standalone test so we could run it in
> the Qpid JMS test suite and do some digging.
>
>> The performance test creates 10 publishing connections and 10
>> consuming connections. They produce/consume every message in separate
>> transactions. The consumer is synchronous. Each pair of producer and
>> consumer uses its own queue.
>>
>> Kind Regards,
>> Alex
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> For additional commands, e-mail: [email protected]
>>
>>
>
>
> --
> Tim Bish
> twitter: @tabish121
> blog: http://timbish.blogspot.com/
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
package org.apache.qpid.perfomance;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class BenchMark
{
private final String _broker;
private final String _user;
private final String _password;
private final int _duration;
private final int _messageSize;
private final String _queueName;
public BenchMark(final String broker,
final String user,
final String password,
final String queueName,
final int duration,
final int messageSize)
{
_broker = broker;
_user= user;
_password = password;
_duration = duration;
_messageSize = messageSize;
_queueName = queueName;
}
public static void main(String[] args) throws Exception
{
String broker = System.getProperty("broker", "localhost:5672");
String user = System.getProperty("user", "guest");
String password = System.getProperty("password", "guest");
String testQueue = System.getProperty("queue", "test");
int duration = Integer.getInteger("duration", 60000);
int messageSize = Integer.getInteger("size", 1024);
System.out.println(String.format(
"Testing performance for duration of %d seconds against broker '%s'. Test queue is '%s'.",
duration/1000,
broker,
testQueue));
BenchMark benchMark = new BenchMark(broker, user, password, testQueue, duration, messageSize);
Result result = benchMark.run();
System.out.println(result);
}
public Result run()
throws NamingException, JMSException, InterruptedException
{
Connection publishingConnection = createConnection();
MessageHandler handler;
long start;
try
{
Session producerSession = publishingConnection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = producerSession.createQueue(_queueName);
Connection consumerConnection = createConnection();
try
{
Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(queue);
handler = new MessageHandler(consumerSession);
consumer.setMessageListener(handler);
consumerConnection.start();
BytesMessage message = producerSession.createBytesMessage();
message.writeBytes(new byte[_messageSize]);
MessageProducer producer = producerSession.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
AtomicLong messageCounter = new AtomicLong();
start = System.currentTimeMillis();
long end = start + _duration;
long currentTime;
do
{
producer.send(message);
producerSession.commit();
long numberOfProducedMessages = messageCounter.incrementAndGet();
currentTime = System.currentTimeMillis();
while ((numberOfProducedMessages - handler.getMessageCount()) > 100 && currentTime < end )
{
Thread.sleep(50);
}
}
while (currentTime < end && !handler.exceptionOccurred());
handler.end();
// consume remaining
while (!handler.exceptionOccurred() && messageCounter.get() > handler.getMessageCount())
{
Thread.sleep(50);
}
consumer.close();
consumerSession.close();
}
finally
{
consumerConnection.close();
}
producerSession.close();
}
finally
{
publishingConnection.close();
}
if (handler != null)
{
double testDuration = handler.getEndTime() - start;
if (testDuration > 0)
{
final double messageThroughput = (double) handler.getMessageNumber() / testDuration;
final long bytesThroughput =
(long) Math.ceil((double) handler.getMessageCount() / testDuration * (double) _messageSize);
return new Result(messageThroughput, bytesThroughput, handler.getException());
}
}
return null;
}
private Connection createConnection() throws NamingException, JMSException
{
final Properties properties = new Properties();
properties.put("java.naming.factory.initial", "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
properties.put("connectionfactory.myFactory", "amqp://" + _broker);
Context context = new InitialContext(properties);
Connection connection = null;
try
{
ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactory");
connection = factory.createConnection(_user, _password);
}
finally
{
context.close();
}
return connection;
}
private static class MessageHandler implements MessageListener
{
private final AtomicLong _messageCounter = new AtomicLong();
private final Session _session;
private volatile JMSException _exception;
private volatile long _endTime;
private volatile long _messageNumber;
public MessageHandler(final Session session)
{
_session = session;
}
@Override
public void onMessage(final Message message)
{
try
{
_session.commit();
}
catch (JMSException e)
{
_exception = e;
}
_messageCounter.incrementAndGet();
}
public long getMessageCount()
{
return _messageCounter.get();
}
public boolean exceptionOccurred()
{
return _exception != null;
}
public JMSException getException()
{
return _exception;
}
public void end()
{
_endTime = System.currentTimeMillis();
_messageNumber = _messageCounter.get();
}
public long getEndTime()
{
return _endTime;
}
public long getMessageNumber()
{
return _messageNumber;
}
}
public static class Result
{
private final JMSException _exception;
double _messageThroughput;
long _bytesThroughput;
public Result(final double messageThroughput, final long bytesThroughput, JMSException exception)
{
_messageThroughput = messageThroughput;
_bytesThroughput = bytesThroughput;
_exception = exception;
}
public double getMessageThroughput()
{
return _messageThroughput;
}
public long getBytesThroughput()
{
return _bytesThroughput;
}
@Override
public String toString()
{
String result = String.format("Throughput: %.2f messages/second, %d bytes/second",
_messageThroughput,
_bytesThroughput);
if (_exception != null)
{
result += System.getProperty("line.separator");
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
_exception.printStackTrace(pw);
result += sw.toString();
}
return result;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]