Author: sebb
Date: Sun May 2 01:23:52 2010
New Revision: 940130
URL: http://svn.apache.org/viewvc?rev=940130&view=rev
Log:
Bug 47949 - JMS Subscriber may not receive all the messages
Modified:
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
jakarta/jmeter/trunk/xdocs/changes.xml
Modified:
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
URL:
http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java?rev=940130&r1=940129&r2=940130&view=diff
==============================================================================
---
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
(original)
+++
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
Sun May 2 01:23:52 2010
@@ -18,6 +18,8 @@
package org.apache.jmeter.protocol.jms.client;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
@@ -29,7 +31,6 @@ import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import org.apache.jmeter.protocol.jms.sampler.BaseJMSSampler;
import org.apache.jorphan.logging.LoggingManager;
import org.apache.log.Logger;
@@ -61,9 +62,8 @@ public class ReceiveSubscriber implement
@SuppressWarnings("unused")
private int loop = 1; // TODO never read
-
- //@GuardedBy("this")
- private final StringBuffer buffer = new StringBuffer();
+
+ private final ConcurrentLinkedQueue<TextMessage> queue = new
ConcurrentLinkedQueue<TextMessage>();
//@GuardedBy("this")
private volatile boolean RUN = true;
@@ -152,11 +152,13 @@ public class ReceiveSubscriber implement
}
/**
- * Get the message as a string
- *
+ * Get the message
+ * @return the next message from the queue or null if none
*/
- public synchronized String getMessage() {
- return this.buffer.toString();
+ public synchronized TextMessage getMessage() {
+ TextMessage msg = queue.poll();
+ this.counter--;
+ return msg;
}
/**
@@ -172,7 +174,7 @@ public class ReceiveSubscriber implement
this.CONN.close();
this.CLIENTTHREAD.interrupt();
this.CLIENTTHREAD = null;
- this.buffer.setLength(0);
+ queue.clear();
} catch (JMSException e) {
log.error(e.getMessage());
} catch (Exception e) {
@@ -181,14 +183,6 @@ public class ReceiveSubscriber implement
}
/**
- * Reset the receiver ready for receiving any further messages
- */
- public synchronized void reset() {
- counter = 0;
- this.buffer.setLength(0);
- }
-
- /**
* Increment the count and return the new value
*
* @param increment
@@ -228,17 +222,8 @@ public class ReceiveSubscriber implement
try {
Message message = this.SUBSCRIBER.receive();
if (message != null && message instanceof TextMessage) {
- TextMessage msg = (TextMessage) message;
- String text = msg.getText();
- if (text.trim().length() > 0) {
- synchronized (this) {
- this.buffer.append(BaseJMSSampler
- .getMessageHeaders(message));
- this.buffer.append("JMS Message Text:\n\n");
- this.buffer.append(text);
- count(1);
- }
- }
+ queue.add((TextMessage)message);
+ count(1);
}
} catch (JMSException e) {
log.error("Communication error: " + e.getMessage());
Modified:
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
URL:
http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java?rev=940130&r1=940129&r2=940130&view=diff
==============================================================================
---
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
(original)
+++
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
Sun May 2 01:23:52 2010
@@ -17,6 +17,9 @@
package org.apache.jmeter.protocol.jms.sampler;
+import java.util.Enumeration;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -48,12 +51,8 @@ public class SubscriberSampler extends B
// No need to synch/ - only used by sampler and ClientPool (which does its
own synch)
private transient ReceiveSubscriber SUBSCRIBER = null;
- //@GuardedBy("this")
- private final StringBuffer BUFFER = new StringBuffer();
-
- //@GuardedBy("this")
- private transient int counter = 0;
-
+ private final ConcurrentLinkedQueue<TextMessage> queue = new
ConcurrentLinkedQueue<TextMessage>();
+
private transient volatile boolean interrupted = false;
// Don't change the string, as it is used in JMX files
@@ -103,6 +102,7 @@ public class SubscriberSampler extends B
sub = new
OnMessageSubscriber(this.getUseJNDIPropertiesAsBoolean(),
this.getJNDIInitialContextFactory(),
this.getProviderUrl(), this.getConnectionFactory(),
this.getTopic(), this.isUseAuth(), this
.getUsername(), this.getPassword());
+ queue.clear();
sub.setMessageListener(this);
sub.resume();
ClientPool.addClient(sub);
@@ -149,14 +149,18 @@ public class SubscriberSampler extends B
private SampleResult sampleWithListener() {
SampleResult result = new SampleResult();
result.setDataType(SampleResult.TEXT);
+ StringBuffer buffer = new StringBuffer();
+ StringBuffer propBuffer = new StringBuffer();
+ int cnt;
+
result.setSampleLabel(getName());
initListenerClient();
int loop = this.getIterationCount();
result.sampleStart();
- int read;
- while ((read=this.count(0)) < loop && interrupted == false) {
+
+ while (queue.size() < loop && interrupted == false) {
try {
Thread.sleep(0, 50);
} catch (InterruptedException e) {
@@ -164,21 +168,39 @@ public class SubscriberSampler extends B
}
}
result.sampleEnd();
- synchronized (this) {// Need to synch because buffer is shared with
onMessageHandler
- if (this.getReadResponseAsBoolean()) {
- result.setResponseData(this.BUFFER.toString(), null);
- } else {
- result.setBytes(this.BUFFER.toString().length());
+
+ for(cnt = 0; cnt < loop ; cnt++) {
+ TextMessage msg = queue.poll();
+ if (msg != null) {
+ try {
+ buffer.append(msg.getText());
+ Enumeration<?> props = msg.getPropertyNames();
+ while(props.hasMoreElements()) {
+ String name = (String) props.nextElement();
+ propBuffer.append("PROPERTY: ");
+ propBuffer.append(name);
+ propBuffer.append("=");
+ propBuffer.append(msg.getObjectProperty(name));
+ propBuffer.append("\n");
+ }
+ } catch (JMSException e) {
+ log.error(e.getMessage());
+ }
}
- read=this.count(0);
}
+ if (this.getReadResponseAsBoolean()) {
+ result.setResponseData(buffer.toString().getBytes());
+ } else {
+ result.setBytes(buffer.toString().getBytes().length);
+ }
+ result.setResponseHeaders(propBuffer.toString());
+ result.setDataType(SampleResult.TEXT);
result.setSuccessful(true);
result.setResponseCodeOK();
- result.setResponseMessage(read + " messages received");
+ result.setResponseMessage(loop + " messages received"); // TODO fix
result.setSamplerData(loop + " messages expected");
- result.setSampleCount(read);
+ result.setSampleCount(loop);
- this.resetCount();
return result;
}
@@ -191,6 +213,11 @@ public class SubscriberSampler extends B
private SampleResult sampleWithReceive() {
SampleResult result = new SampleResult();
result.setDataType(SampleResult.TEXT);
+ StringBuffer buffer = new StringBuffer();
+ StringBuffer propBuffer = new StringBuffer();
+ int cnt;
+
+
result.setSampleLabel(getName());
if (this.SUBSCRIBER == null) {
this.initReceiveClient();
@@ -208,19 +235,38 @@ public class SubscriberSampler extends B
}
}
result.sampleEnd();
- int read = this.SUBSCRIBER.count(0);
+ result.setResponseMessage(loop + " samples messages received");
+ for(cnt = 0; cnt < loop ; cnt++) {
+ TextMessage msg = this.SUBSCRIBER.getMessage();
+ if (msg != null) {
+ try {
+ buffer.append(msg.getText());
+ Enumeration<?> props = msg.getPropertyNames();
+ while(props.hasMoreElements()) {
+ String name = (String) props.nextElement();
+ propBuffer.append("PROPERTY: ");
+ propBuffer.append(name);
+ propBuffer.append("=");
+ propBuffer.append(msg.getObjectProperty(name));
+ propBuffer.append("\n");
+ }
+ } catch (JMSException e) {
+ log.error(e.getMessage());
+ }
+ }
+ }
if (this.getReadResponseAsBoolean()) {
- result.setResponseData(this.SUBSCRIBER.getMessage(), null);
+ result.setResponseData(buffer.toString().getBytes());
} else {
- result.setBytes(this.SUBSCRIBER.getMessage().length());
+ result.setBytes(buffer.toString().getBytes().length);
}
+ result.setResponseHeaders(propBuffer.toString());
result.setSuccessful(true);
result.setResponseCodeOK();
- result.setResponseMessage(read + " message(s) received successfully");
+ result.setResponseMessage(loop + " message(s) received successfully");
result.setSamplerData(loop + " messages expected");
- result.setSampleCount(read);
+ result.setSampleCount(loop);
- this.SUBSCRIBER.reset();
return result;
}
@@ -229,42 +275,11 @@ public class SubscriberSampler extends B
* listener with the TopicSubscriber.
*/
public synchronized void onMessage(Message message) {
- try {
- if (message instanceof TextMessage) {
- TextMessage msg = (TextMessage) message;
- String content = msg.getText();
- if (content != null) {
- this.BUFFER.append(getMessageHeaders(message));
- this.BUFFER.append("JMS Message Text:\n\n");
- this.BUFFER.append(content);
- count(1);
- }
- }
- } catch (JMSException e) {
- log.error(e.getMessage());
+ if (message instanceof TextMessage) {
+ queue.add((TextMessage)message);
}
}
- /**
- * increment the count and return the new value.
- *
- * @param increment
- * @return the new value
- */
- private synchronized int count(int increment) {
- this.counter += increment;
- return this.counter;
- }
-
- /**
- * resetCount will set the counter to zero and set the length of the
- * StringBuffer to zero.
- */
- private synchronized void resetCount() {
- this.counter = 0;
- this.BUFFER.setLength(0);
- }
-
// ----------- get/set methods ------------------- //
/**
* Set the client choice. There are two options: ReceiveSusbscriber and
Modified: jakarta/jmeter/trunk/xdocs/changes.xml
URL:
http://svn.apache.org/viewvc/jakarta/jmeter/trunk/xdocs/changes.xml?rev=940130&r1=940129&r2=940130&view=diff
==============================================================================
--- jakarta/jmeter/trunk/xdocs/changes.xml (original)
+++ jakarta/jmeter/trunk/xdocs/changes.xml Sun May 2 01:23:52 2010
@@ -99,6 +99,7 @@ The XPath Assertion and XPath Extractor
<li>Bug 48579 - Single Bind does not show config information when LdapExt
Sampler is accessed</li>
<li>Bug 48747 - TCP Sampler swallows exceptions</li>
<li>Bug 49111 - "Message With ID Not Found" Error on JMS P2P sampler.</li>
+<li>Bug 47949 - JMS Subscriber never recieves all the messages</li>
</ul>
<h3>Controllers</h3>