Author: sebb Date: Tue Nov 11 13:37:26 2008 New Revision: 713179 URL: http://svn.apache.org/viewvc?rev=713179&view=rev Log: Lots of thread-safety fixes, but still needs some work Merged close() and resetCount() methods as they need to be done together
Modified: jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java Modified: jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java URL: http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java?rev=713179&r1=713178&r2=713179&view=diff ============================================================================== --- jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java (original) +++ jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java Tue Nov 11 13:37:26 2008 @@ -32,46 +32,71 @@ import org.apache.jorphan.logging.LoggingManager; import org.apache.log.Logger; +/** + * Receives messages in a separate thread until told to stop. + * Run loop permanently receives messages; the sampler calls reset() + * when it has taken enough messages. + * + */ +/* + * TODO Needs rework - there is a window between receiving a message and calling reset() + * which means that a message can be lost. It's not clear why a separate thread is needed, + * given that the sampler loops until enough samples have been received. + * Also, messages are received in wait mode, so the RUN flag won't be checked until + * at least one more message has been received. +*/ public class ReceiveSubscriber implements Runnable { private static final Logger log = LoggingManager.getLoggerForClass(); - private TopicConnection CONN = null; + private final TopicConnection CONN; - private TopicSession SESSION = null; + private final TopicSession SESSION; - private Topic TOPIC = null; + private final Topic TOPIC; - private TopicSubscriber SUBSCRIBER = null; - - private byte[] RESULT = null; + private final TopicSubscriber SUBSCRIBER; + //@GuardedBy("this") private int counter; private int loop = 1; // TODO never read + //@GuardedBy("this") private StringBuffer buffer = new StringBuffer(); + //@GuardedBy("this") private volatile boolean RUN = true; // Needs to be volatile to ensure value is picked up - private Thread CLIENTTHREAD = null; - - /** - * - */ - public ReceiveSubscriber() { - super(); - } + //@GuardedBy("this") + private Thread CLIENTTHREAD; public ReceiveSubscriber(boolean useProps, String jndi, String url, String connfactory, String topic, boolean useAuth, String user, String pwd) { Context ctx = initJNDI(useProps, jndi, url, useAuth, user, pwd); + TopicConnection _conn = null; + Topic _topic = null; + TopicSession _session = null; + TopicSubscriber _subscriber = null; if (ctx != null) { - initConnection(ctx, connfactory, topic); + try { + ConnectionFactory.getTopicConnectionFactory(ctx,connfactory); + _conn = ConnectionFactory.getTopicConnection(); + _topic = InitialContextFactory.lookupTopic(ctx, topic); + _session = this.CONN.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); + _subscriber = this.SESSION.createSubscriber(this.TOPIC); + log.info("created the topic connection successfully"); + } catch (JMSException e) { + log.error("Connection error: " + e.getMessage()); + } } else { log.error("Could not initialize JNDI Initial Context Factory"); } + this.CONN = _conn; + this.TOPIC = _topic; + this.SESSION = _session; + this.SUBSCRIBER = _subscriber; } /** @@ -85,7 +110,8 @@ * @param pwd * @return the JNDI initial context or null */ - public Context initJNDI(boolean useProps, String jndi, String url, boolean useAuth, String user, String pwd) { + // Called by ctor + private Context initJNDI(boolean useProps, String jndi, String url, boolean useAuth, String user, String pwd) { if (useProps) { try { return new InitialContext(); @@ -99,26 +125,6 @@ } /** - * Create the connection, session and topic subscriber - * - * @param ctx - * @param connfactory - * @param topic - */ - public void initConnection(Context ctx, String connfactory, String topic) { - try { - ConnectionFactory.getTopicConnectionFactory(ctx,connfactory); - this.CONN = ConnectionFactory.getTopicConnection(); - this.TOPIC = InitialContextFactory.lookupTopic(ctx, topic); - this.SESSION = this.CONN.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); - this.SUBSCRIBER = this.SESSION.createSubscriber(this.TOPIC); - log.info("created the topic connection successfully"); - } catch (JMSException e) { - log.error("Connection error: " + e.getMessage()); - } - } - - /** * Set the number of iterations for each call to sample() * * @param loop @@ -147,53 +153,45 @@ * Get the message as a string * */ - public String getMessage() { + public synchronized String getMessage() { return this.buffer.toString(); } /** * Get the message(s) as an array of byte[] - * + * */ - public byte[] getByteResult() { - if (this.buffer.length() > 0) { - this.RESULT = this.buffer.toString().getBytes(); - } - return this.RESULT; + public synchronized byte[] getByteResult() { + return this.buffer.toString().getBytes(); } /** * close() will stop the connection first. Then it closes the subscriber, - * session and connection and sets them to null. + * session and connection. */ - public synchronized void close() { + public synchronized void close() { // called from testEnded() thread try { + this.RUN = false; this.CONN.stop(); this.SUBSCRIBER.close(); this.SESSION.close(); this.CONN.close(); - this.SUBSCRIBER = null; - this.SESSION = null; - this.CONN = null; - this.RUN = false; this.CLIENTTHREAD.interrupt(); this.CLIENTTHREAD = null; this.buffer.setLength(0); - this.buffer = null; } catch (JMSException e) { log.error(e.getMessage()); - } catch (Throwable e) { + } catch (Exception e) { log.error(e.getMessage()); } } /** - * Clear will set the buffer to zero and the result objects to null. Clear - * should be called at the end of a sample. + * Reset the receiver ready for receiving any further messages */ - public void clear() { + public synchronized void reset() { + counter = 0; this.buffer.setLength(0); - this.RESULT = null; } /** @@ -207,52 +205,46 @@ } /** - * Reset will reset the counter and prepare for the next sample() call. - * - */ - public synchronized int resetCount() { - counter = 0; - return counter; - } - - /** * start will create a new thread and pass this class. once the thread is * created, it calls Thread.start(). */ public void start() { + // No point starting thread unless subscriber exists + if (SUBSCRIBER == null) { + log.error("Subscriber has not been set up"); + return; + } this.CLIENTTHREAD = new Thread(this, "Subscriber2"); this.CLIENTTHREAD.start(); } /** - * run calls listen to begin listening for inboud messages from the + * run calls listen to begin listening for inbound messages from the * provider. + * + * Updates the count field so the caller can check how many messages have been receieved. + * */ public void run() { - ReceiveSubscriber.this.listen(); - } - - /** - * Listen for inbound messages - */ - protected void listen() { - log.info("Subscriber2.listen() called"); + if (SUBSCRIBER == null) { // just in case + log.error("Subscriber has not been set up"); + return; + } while (RUN) { - if (SUBSCRIBER == null) { - log.error("Subscriber has not been set up"); - break; - } try { Message message = this.SUBSCRIBER.receive(); if (message != null && message instanceof TextMessage) { TextMessage msg = (TextMessage) message; - if (msg.getText().trim().length() > 0) { - this.buffer.append(msg.getText()); - count(1); + String text = msg.getText(); + if (text.trim().length() > 0) { + synchronized (this) { + this.buffer.append(text); + count(1); + } } } } catch (JMSException e) { - log.info("Communication error: " + e.getMessage()); + log.error("Communication error: " + e.getMessage()); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]