Author: sebb
Date: Fri Jun 25 00:20:40 2010
New Revision: 957767
URL: http://svn.apache.org/viewvc?rev=957767&view=rev
Log:
Merge OnMessageSubscriber and onMessage method/queue with ReceiveSubscriber
No longer use ClientPool (did not share anything)
Tidy SubscriberSampler now sample code is identical for both strategies
Removed:
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/OnMessageSubscriber.java
Modified:
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
Modified:
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
URL:
http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java?rev=957767&r1=957766&r2=957767&view=diff
==============================================================================
---
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
(original)
+++
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
Fri Jun 25 00:20:40 2010
@@ -19,12 +19,15 @@
package org.apache.jmeter.protocol.jms.client;
import java.io.Closeable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.NamingException;
@@ -34,10 +37,15 @@ import org.apache.jorphan.logging.Loggin
import org.apache.log.Logger;
/**
- * Uses MessageConsumer.receive(timeout) to fetch messages.
- * Does not cache any messages.
+ * Generic MessageConsumer class, which has two possible strategies.
+ * <ul>
+ * <li>Use MessageConsumer.receive(timeout) to fetch messages.</li>
+ * <li>Use MessageListener.onMessage() to cache messages in a local queue.</li>
+ * </ul>
+ * In both cases, the {...@link #getMessage(long)} method is used to return
the next message,
+ * either directly using receive(timeout) or from the queue using
poll(timeout).
*/
-public class ReceiveSubscriber implements Closeable {
+public class ReceiveSubscriber implements Closeable, MessageListener {
private static final Logger log = LoggingManager.getLoggerForClass();
@@ -47,6 +55,11 @@ public class ReceiveSubscriber implement
private final MessageConsumer SUBSCRIBER;
+ /*
+ * We use a LinkedBlockingQueue (rather than a ConcurrentLinkedQueue)
because it has a
+ * poll-with-wait method that avoids the need to use a polling loop.
+ */
+ private final LinkedBlockingQueue<Message> queue;
/**
* Constructor takes the necessary JNDI related parameters to create a
@@ -76,6 +89,45 @@ public class ReceiveSubscriber implement
SESSION = CONN.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = Utils.lookupDestination(ctx, destinationName);
SUBSCRIBER = SESSION.createConsumer(dest);
+ queue = null;
+ log.debug("<init> complete");
+ }
+
+ /**
+ * Constructor takes the necessary JNDI related parameters to create a
+ * connection and create an onMessageListener to prepare to begin
receiving messages.
+ * <br/>
+ * The caller must then invoke {...@link #start()} to enable message
reception.
+ *
+ * @param queueSize maximum queue size <=0 == no limit
+ * @param useProps if true, use jndi.properties instead of
+ * initialContextFactory, providerUrl, securityPrincipal,
securityCredentials
+ * @param initialContextFactory
+ * @param providerUrl
+ * @param connfactory
+ * @param destinationName
+ * @param useAuth
+ * @param securityPrincipal
+ * @param securityCredentials
+ * @throws JMSException if could not create context or other problem
occurred.
+ * @throws NamingException
+ */
+ public ReceiveSubscriber(int queueSize, boolean useProps,
+ String initialContextFactory, String providerUrl, String
connfactory, String destinationName,
+ boolean useAuth,
+ String securityPrincipal, String securityCredentials) throws
NamingException, JMSException {
+ Context ctx = InitialContextFactory.getContext(useProps,
+ initialContextFactory, providerUrl, useAuth,
securityPrincipal, securityCredentials);
+ CONN = Utils.getConnection(ctx, connfactory);
+ SESSION = CONN.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination dest = Utils.lookupDestination(ctx, destinationName);
+ SUBSCRIBER = SESSION.createConsumer(dest);
+ if (queueSize <=0) {
+ queue = new LinkedBlockingQueue<Message>();
+ } else {
+ queue = new LinkedBlockingQueue<Message>(queueSize);
+ }
+ SUBSCRIBER.setMessageListener(this);
log.debug("<init> complete");
}
@@ -108,6 +160,18 @@ public class ReceiveSubscriber implement
*/
public Message getMessage(long timeout) throws JMSException {
Message message = null;
+ if (queue != null) { // Using onMessage Listener
+ try {
+ if (timeout < 10) { // Allow for short/negative times
+ message = queue.poll();
+ } else {
+ message = queue.poll(timeout, TimeUnit.MILLISECONDS);
+ }
+ } catch (InterruptedException e) {
+ // Ignored
+ }
+ return message;
+ }
if (timeout < 10) { // Allow for short/negative times
message = SUBSCRIBER.receiveNoWait();
} else {
@@ -116,10 +180,10 @@ public class ReceiveSubscriber implement
return message;
}
/**
- * close() will stop the connection first. Then it closes the subscriber,
- * session and connection.
+ * close() will stop the connection first.
+ * Then it closes the subscriber, session and connection.
*/
- public synchronized void close() { // called from testEnded() thread
+ public void close() { // called from threadFinished() thread
log.debug("close()");
try {
CONN.stop();
@@ -130,4 +194,14 @@ public class ReceiveSubscriber implement
Utils.close(SESSION, log);
Utils.close(CONN, log);
}
+
+
+ /**
+ * {...@inheritdoc}
+ */
+ public void onMessage(Message message) {
+ if (!queue.offer(message)){
+ log.warn("Could not add message to queue");
+ }
+ }
}
Modified:
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
URL:
http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java?rev=957767&r1=957766&r2=957767&view=diff
==============================================================================
---
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
(original)
+++
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
Fri Jun 25 00:20:40 2010
@@ -18,36 +18,37 @@
package org.apache.jmeter.protocol.jms.sampler;
import java.util.Enumeration;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
-import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.naming.NamingException;
+import org.apache.jmeter.protocol.jms.Utils;
+import org.apache.jmeter.protocol.jms.client.ReceiveSubscriber;
+import org.apache.jmeter.protocol.jms.control.gui.JMSSubscriberGui;
import org.apache.jmeter.samplers.Interruptible;
import org.apache.jmeter.samplers.SampleResult;
-import org.apache.jmeter.testelement.TestListener;
import org.apache.jmeter.testelement.ThreadListener;
import org.apache.jmeter.util.JMeterUtils;
-import org.apache.jmeter.engine.event.LoopIterationEvent;
-
-import org.apache.jmeter.protocol.jms.Utils;
-import org.apache.jmeter.protocol.jms.control.gui.JMSSubscriberGui;
-import org.apache.jmeter.protocol.jms.client.ClientPool;
-import org.apache.jmeter.protocol.jms.client.OnMessageSubscriber;
-import org.apache.jmeter.protocol.jms.client.ReceiveSubscriber;
-
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;
/**
- * This class implements the JMS Subcriber sampler
+ * This class implements the JMS Subcriber sampler.
+ * It supports both receive and onMessage strategies via the ReceiveSubscriber
class.
+ *
*/
-public class SubscriberSampler extends BaseJMSSampler implements
Interruptible, TestListener, ThreadListener, MessageListener {
+// TODO: do we need to implement any kind of connection pooling?
+// If so, which connections should be shared?
+// Should threads share connections to the same destination?
+// What about cross-thread sharing?
+
+// Note: originally the code did use the ClientPool to "share" subscribers,
however since the
+// key was "this" and each sampler is unique - nothing was actually shared.
+
+public class SubscriberSampler extends BaseJMSSampler implements
Interruptible, ThreadListener {
private static final long serialVersionUID = 240L;
@@ -60,12 +61,6 @@ public class SubscriberSampler extends B
// No need to synch/ - only used by sampler and ClientPool (which does its
own synch)
private transient ReceiveSubscriber SUBSCRIBER = null;
- /*
- * We use a LinkedBlockingQueue (rather than a ConcurrentLinkedQueue)
because it has a
- * poll-with-wait method that avoids the need to use a polling loop.
- */
- private transient LinkedBlockingQueue<Message> queue;
-
private transient volatile boolean interrupted = false;
private transient long timeout;
@@ -83,35 +78,6 @@ public class SubscriberSampler extends B
public SubscriberSampler() {
}
- public void testEnded(String test) {
- testEnded();
- }
-
- public void testStarted(String test) {
- testStarted();
- }
-
- /**
- * testEnded is called by Jmeter's engine.
- * Clears the client pool.
- */
- public void testEnded() {
- log.debug("SubscriberSampler.testEnded called");
- ClientPool.clearClient();
- }
-
- /**
- * {...@inheritdoc}
- */
- public void testStarted() {
- }
-
- /**
- * {...@inheritdoc}
- */
- public void testIterationStart(LoopIterationEvent event) {
- }
-
/**
* Create the OnMessageSubscriber client and set the sampler as the message
* listener.
@@ -120,19 +86,10 @@ public class SubscriberSampler extends B
*
*/
private void initListenerClient() throws JMSException, NamingException {
- OnMessageSubscriber sub = (OnMessageSubscriber) ClientPool.get(this);
- if (sub == null) {
- sub = new OnMessageSubscriber(getUseJNDIPropertiesAsBoolean(),
getJNDIInitialContextFactory(),
+ SUBSCRIBER = new ReceiveSubscriber(0, getUseJNDIPropertiesAsBoolean(),
getJNDIInitialContextFactory(),
getProviderUrl(), getConnectionFactory(),
getDestination(),
isUseAuth(), getUsername(), getPassword());
- queue = new LinkedBlockingQueue<Message>();
- sub.setMessageListener(this);
- sub.start();
- ClientPool.addClient(sub);
- ClientPool.put(this, sub);
- log.debug("SubscriberSampler.initListenerClient called");
- log.debug("loop count " + getIterations());
- }
+ log.debug("SubscriberSampler.initListenerClient called");
}
/**
@@ -144,8 +101,6 @@ public class SubscriberSampler extends B
SUBSCRIBER = new ReceiveSubscriber(getUseJNDIPropertiesAsBoolean(),
getJNDIInitialContextFactory(), getProviderUrl(),
getConnectionFactory(), getDestination(),
isUseAuth(), getUsername(), getPassword());
- SUBSCRIBER.start();
- ClientPool.addClient(SUBSCRIBER);
log.debug("SubscriberSampler.initReceiveClient called");
}
@@ -155,6 +110,7 @@ public class SubscriberSampler extends B
*
* @return the appropriate sample result
*/
+ // TODO - should we call start() and stop()?
@Override
public SampleResult sample() {
SampleResult result = new SampleResult();
@@ -168,82 +124,12 @@ public class SubscriberSampler extends B
result.setResponseMessage(exceptionDuringInit.toString());
return result;
}
- if (useReceive) {
- return sampleWithReceive(result);
- } else {
- return sampleWithListener(result);
- }
- }
-
- /**
- * sample will block until messages are received
- * @param result
- *
- * @return the sample result
- */
- private SampleResult sampleWithListener(SampleResult result) {
- StringBuilder buffer = new StringBuilder();
- StringBuilder propBuffer = new StringBuilder();
-
- int loop = getIterationCount();
- int read = 0;
-
- long until = 0L;
- long now = System.currentTimeMillis();
- if (timeout > 0) {
- until = timeout + now;
- }
- while (!interrupted
- && (until == 0 || now < until)
- && read < loop) {
- try {
- Message msg = queue.poll(calculateWait(until, now),
TimeUnit.MILLISECONDS);
- if (msg != null) {
- read++;
- extractContent(buffer, propBuffer, msg);
- }
- } catch (InterruptedException e) {
- // Ignored
- }
- now = System.currentTimeMillis();
- }
- result.sampleEnd();
-
- if (getReadResponseAsBoolean()) {
- result.setResponseData(buffer.toString().getBytes());
- } else {
- result.setBytes(buffer.toString().getBytes().length);
- }
- result.setResponseHeaders(propBuffer.toString());
- result.setDataType(SampleResult.TEXT);
- if (read == 0) {
- result.setResponseCode("404"); // Not found
- result.setSuccessful(false);
- } else { // TODO set different status if not enough messages found?
- result.setResponseCodeOK();
- result.setSuccessful(true);
- }
- result.setResponseMessage(read + " messages received");
- result.setSamplerData(loop + " messages expected");
- result.setSampleCount(read);
-
- return result;
- }
-
- /**
- * Sample method uses the ReceiveSubscriber client instead of onMessage
- * approach.
- * @param result
- *
- * @return the sample result
- */
- private SampleResult sampleWithReceive(SampleResult result) {
StringBuilder buffer = new StringBuilder();
StringBuilder propBuffer = new StringBuilder();
int loop = getIterationCount();
int read = 0;
-
+
long until = 0L;
long now = System.currentTimeMillis();
if (timeout > 0) {
@@ -282,7 +168,7 @@ public class SubscriberSampler extends B
result.setResponseMessage(read + " message(s) received successfully");
result.setSamplerData(loop + " messages expected");
result.setSampleCount(read);
-
+
return result;
}
@@ -328,13 +214,55 @@ public class SubscriberSampler extends B
}
/**
- * The sampler implements MessageListener directly and sets itself as the
- * listener with the MessageConsumer.
+ * Initialise the thread-local variables.
+ * <br/>
+ * {...@inheritdoc}
*/
- public void onMessage(Message message) {
- if (!queue.offer(message)){
- log.warn("Could not add message to queue");
+ public void threadStarted() {
+ timeout = getTimeoutAsLong();
+ interrupted = false;
+ exceptionDuringInit = null;
+ useReceive = getClientChoice().equals(JMSSubscriberGui.RECEIVE_RSC);
+ if (useReceive) {
+ try {
+ initReceiveClient();
+ SUBSCRIBER.start();
+ } catch (NamingException e) {
+ exceptionDuringInit = e;
+ } catch (JMSException e) {
+ exceptionDuringInit = e;
+ }
+ } else {
+ try {
+ initListenerClient();
+ SUBSCRIBER.start();
+ } catch (JMSException e) {
+ exceptionDuringInit = e;
+ } catch (NamingException e) {
+ exceptionDuringInit = e;
+ }
}
+ if (exceptionDuringInit != null){
+ log.error("Could not initialise client",exceptionDuringInit);
+ }
+ }
+
+ /**
+ * Close subscriber.
+ * <br/>
+ * {...@inheritdoc}
+ */
+ public void threadFinished() {
+ SUBSCRIBER.close();
+ }
+
+ /**
+ * Handle an interrupt of the test.
+ */
+ public boolean interrupt() {
+ boolean oldvalue = interrupted;
+ interrupted = true; // so we break the loops in SampleWithListener
and SampleWithReceive
+ return !oldvalue;
}
// ----------- get/set methods ------------------- //
@@ -374,45 +302,7 @@ public class SubscriberSampler extends B
setProperty(TIMEOUT, timeout, TIMEOUT_DEFAULT);
}
- /**
- * Handle an interrupt of the test.
- */
- public boolean interrupt() {
- boolean oldvalue = interrupted;
- interrupted = true; // so we break the loops in SampleWithListener
and SampleWithReceive
- return !oldvalue;
- }
-
// This was the old value that was checked for
private final static String RECEIVE_STR =
JMeterUtils.getResString(JMSSubscriberGui.RECEIVE_RSC); // $NON-NLS-1$
- public void threadFinished() {
- }
-
- public void threadStarted() {
- timeout = getTimeoutAsLong();
- interrupted = false;
- exceptionDuringInit = null;
- useReceive = getClientChoice().equals(JMSSubscriberGui.RECEIVE_RSC);
- if (useReceive) {
- try {
- initReceiveClient();
- } catch (NamingException e) {
- exceptionDuringInit = e;
- } catch (JMSException e) {
- exceptionDuringInit = e;
- }
- } else {
- try {
- initListenerClient();
- } catch (JMSException e) {
- exceptionDuringInit = e;
- } catch (NamingException e) {
- exceptionDuringInit = e;
- }
- }
- if (exceptionDuringInit != null){
- log.error("Could not initialise client",exceptionDuringInit);
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]