Author: sebb
Date: Sun May  2 01:23:52 2010
New Revision: 940130

URL: http://svn.apache.org/viewvc?rev=940130&view=rev
Log:
Bug 47949 - JMS Subscriber may not receive all the messages

Modified:
    
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
    
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
    jakarta/jmeter/trunk/xdocs/changes.xml

Modified: 
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
URL: 
http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java?rev=940130&r1=940129&r2=940130&view=diff
==============================================================================
--- 
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
 (original)
+++ 
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
 Sun May  2 01:23:52 2010
@@ -18,6 +18,8 @@
 
 package org.apache.jmeter.protocol.jms.client;
 
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.TextMessage;
@@ -29,7 +31,6 @@ import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
 
-import org.apache.jmeter.protocol.jms.sampler.BaseJMSSampler;
 import org.apache.jorphan.logging.LoggingManager;
 import org.apache.log.Logger;
 
@@ -61,9 +62,8 @@ public class ReceiveSubscriber implement
 
     @SuppressWarnings("unused")
     private int loop = 1; // TODO never read
-
-    //@GuardedBy("this")
-    private final StringBuffer buffer = new StringBuffer();
+    
+    private final ConcurrentLinkedQueue<TextMessage> queue = new 
ConcurrentLinkedQueue<TextMessage>();
 
     //@GuardedBy("this")
     private volatile boolean RUN = true;
@@ -152,11 +152,13 @@ public class ReceiveSubscriber implement
     }
 
     /**
-     * Get the message as a string
-     *
+     * Get the message
+     * @return the next message from the queue or null if none
      */
-    public synchronized String getMessage() {
-        return this.buffer.toString();
+    public synchronized TextMessage getMessage() {
+        TextMessage msg = queue.poll();
+        this.counter--;
+        return msg;
     }
 
     /**
@@ -172,7 +174,7 @@ public class ReceiveSubscriber implement
             this.CONN.close();
             this.CLIENTTHREAD.interrupt();
             this.CLIENTTHREAD = null;
-            this.buffer.setLength(0);
+            queue.clear();
         } catch (JMSException e) {
             log.error(e.getMessage());
         } catch (Exception e) {
@@ -181,14 +183,6 @@ public class ReceiveSubscriber implement
     }
 
     /**
-     * Reset the receiver ready for receiving any further messages
-     */
-    public synchronized void reset() {
-        counter = 0;
-        this.buffer.setLength(0);
-    }
-
-    /**
      * Increment the count and return the new value
      *
      * @param increment
@@ -228,17 +222,8 @@ public class ReceiveSubscriber implement
             try {
                 Message message = this.SUBSCRIBER.receive();
                 if (message != null && message instanceof TextMessage) {
-                    TextMessage msg = (TextMessage) message;
-                    String text = msg.getText();
-                    if (text.trim().length() > 0) {
-                        synchronized (this) {
-                            this.buffer.append(BaseJMSSampler
-                                .getMessageHeaders(message));
-                            this.buffer.append("JMS Message Text:\n\n");
-                            this.buffer.append(text);
-                            count(1);
-                        }
-                    }
+                    queue.add((TextMessage)message);
+                    count(1);
                 }
             } catch (JMSException e) {
                 log.error("Communication error: " + e.getMessage());

Modified: 
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
URL: 
http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java?rev=940130&r1=940129&r2=940130&view=diff
==============================================================================
--- 
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
 (original)
+++ 
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
 Sun May  2 01:23:52 2010
@@ -17,6 +17,9 @@
 
 package org.apache.jmeter.protocol.jms.sampler;
 
+import java.util.Enumeration;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
@@ -48,12 +51,8 @@ public class SubscriberSampler extends B
     // No need to synch/ - only used by sampler and ClientPool (which does its 
own synch)
     private transient ReceiveSubscriber SUBSCRIBER = null;
 
-    //@GuardedBy("this")
-    private final StringBuffer BUFFER = new StringBuffer();
-
-    //@GuardedBy("this")
-    private transient int counter = 0;
-
+    private final ConcurrentLinkedQueue<TextMessage> queue = new 
ConcurrentLinkedQueue<TextMessage>();
+    
     private transient volatile boolean interrupted = false;
 
     // Don't change the string, as it is used in JMX files
@@ -103,6 +102,7 @@ public class SubscriberSampler extends B
             sub = new 
OnMessageSubscriber(this.getUseJNDIPropertiesAsBoolean(), 
this.getJNDIInitialContextFactory(),
                     this.getProviderUrl(), this.getConnectionFactory(), 
this.getTopic(), this.isUseAuth(), this
                             .getUsername(), this.getPassword());
+            queue.clear();
             sub.setMessageListener(this);
             sub.resume();
             ClientPool.addClient(sub);
@@ -149,14 +149,18 @@ public class SubscriberSampler extends B
     private SampleResult sampleWithListener() {
         SampleResult result = new SampleResult();
         result.setDataType(SampleResult.TEXT);
+        StringBuffer buffer = new StringBuffer();
+        StringBuffer propBuffer = new StringBuffer();
+        int cnt;
+        
         result.setSampleLabel(getName());
         initListenerClient();
 
         int loop = this.getIterationCount();
 
         result.sampleStart();
-        int read;
-        while ((read=this.count(0)) < loop && interrupted == false) {
+        
+        while (queue.size() < loop && interrupted == false) {
             try {
                 Thread.sleep(0, 50);
             } catch (InterruptedException e) {
@@ -164,21 +168,39 @@ public class SubscriberSampler extends B
             }
         }
         result.sampleEnd();
-        synchronized (this) {// Need to synch because buffer is shared with 
onMessageHandler
-            if (this.getReadResponseAsBoolean()) {
-                result.setResponseData(this.BUFFER.toString(), null);
-            } else {
-                result.setBytes(this.BUFFER.toString().length());
+       
+        for(cnt = 0; cnt < loop ; cnt++) {
+            TextMessage msg = queue.poll();
+            if (msg != null) {
+                try {
+                    buffer.append(msg.getText());
+                    Enumeration<?> props = msg.getPropertyNames();
+                    while(props.hasMoreElements()) {
+                        String name = (String) props.nextElement();
+                        propBuffer.append("PROPERTY: ");
+                        propBuffer.append(name);
+                        propBuffer.append("=");
+                        propBuffer.append(msg.getObjectProperty(name));
+                        propBuffer.append("\n");
+                    }
+                } catch (JMSException e) {
+                    log.error(e.getMessage());
+                }
             }
-            read=this.count(0);
         }
+        if (this.getReadResponseAsBoolean()) {
+            result.setResponseData(buffer.toString().getBytes());
+        } else {
+            result.setBytes(buffer.toString().getBytes().length);
+        }
+        result.setResponseHeaders(propBuffer.toString());
+        result.setDataType(SampleResult.TEXT);
         result.setSuccessful(true);
         result.setResponseCodeOK();
-        result.setResponseMessage(read + " messages received");
+        result.setResponseMessage(loop + " messages received"); // TODO fix
         result.setSamplerData(loop + " messages expected");
-        result.setSampleCount(read);
+        result.setSampleCount(loop);
 
-        this.resetCount();
         return result;
     }
 
@@ -191,6 +213,11 @@ public class SubscriberSampler extends B
     private SampleResult sampleWithReceive() {
         SampleResult result = new SampleResult();
         result.setDataType(SampleResult.TEXT);
+        StringBuffer buffer = new StringBuffer();
+        StringBuffer propBuffer = new StringBuffer();
+        int cnt;
+        
+        
         result.setSampleLabel(getName());
         if (this.SUBSCRIBER == null) {
             this.initReceiveClient();
@@ -208,19 +235,38 @@ public class SubscriberSampler extends B
             }
         }
         result.sampleEnd();
-        int read = this.SUBSCRIBER.count(0);
+        result.setResponseMessage(loop + " samples messages received");
+        for(cnt = 0; cnt < loop ; cnt++) {
+            TextMessage msg = this.SUBSCRIBER.getMessage();
+            if (msg != null) {
+                try {
+                    buffer.append(msg.getText());
+                    Enumeration<?> props = msg.getPropertyNames();
+                    while(props.hasMoreElements()) {
+                        String name = (String) props.nextElement();
+                        propBuffer.append("PROPERTY: ");
+                        propBuffer.append(name);
+                        propBuffer.append("=");
+                        propBuffer.append(msg.getObjectProperty(name));
+                        propBuffer.append("\n");
+                    }
+                } catch (JMSException e) {
+                    log.error(e.getMessage());
+                }
+            }
+        }
         if (this.getReadResponseAsBoolean()) {
-            result.setResponseData(this.SUBSCRIBER.getMessage(), null);
+            result.setResponseData(buffer.toString().getBytes());
         } else {
-            result.setBytes(this.SUBSCRIBER.getMessage().length());
+            result.setBytes(buffer.toString().getBytes().length);
         }
+        result.setResponseHeaders(propBuffer.toString());
         result.setSuccessful(true);
         result.setResponseCodeOK();
-        result.setResponseMessage(read + " message(s) received successfully");
+        result.setResponseMessage(loop + " message(s) received successfully");
         result.setSamplerData(loop + " messages expected");
-        result.setSampleCount(read);
+        result.setSampleCount(loop);
 
-        this.SUBSCRIBER.reset();
         return result;
     }
 
@@ -229,42 +275,11 @@ public class SubscriberSampler extends B
      * listener with the TopicSubscriber.
      */
     public synchronized void onMessage(Message message) {
-        try {
-            if (message instanceof TextMessage) {
-                TextMessage msg = (TextMessage) message;
-                String content = msg.getText();
-                if (content != null) {
-                    this.BUFFER.append(getMessageHeaders(message));
-                    this.BUFFER.append("JMS Message Text:\n\n");
-                    this.BUFFER.append(content);
-                    count(1);
-                }
-            }
-        } catch (JMSException e) {
-            log.error(e.getMessage());
+        if (message instanceof TextMessage) {
+            queue.add((TextMessage)message);
         }
     }
 
-    /**
-     * increment the count and return the new value.
-     *
-     * @param increment
-     * @return the new value
-     */
-    private synchronized int count(int increment) {
-        this.counter += increment;
-        return this.counter;
-    }
-
-    /**
-     * resetCount will set the counter to zero and set the length of the
-     * StringBuffer to zero.
-     */
-    private synchronized void resetCount() {
-        this.counter = 0;
-        this.BUFFER.setLength(0);
-    }
-
     // ----------- get/set methods ------------------- //
     /**
      * Set the client choice. There are two options: ReceiveSusbscriber and

Modified: jakarta/jmeter/trunk/xdocs/changes.xml
URL: 
http://svn.apache.org/viewvc/jakarta/jmeter/trunk/xdocs/changes.xml?rev=940130&r1=940129&r2=940130&view=diff
==============================================================================
--- jakarta/jmeter/trunk/xdocs/changes.xml (original)
+++ jakarta/jmeter/trunk/xdocs/changes.xml Sun May  2 01:23:52 2010
@@ -99,6 +99,7 @@ The XPath Assertion and XPath Extractor 
 <li>Bug 48579 - Single Bind does not show config information when LdapExt 
Sampler is accessed</li>
 <li>Bug 48747 - TCP Sampler swallows exceptions</li>
 <li>Bug 49111 - "Message With ID Not Found" Error on JMS P2P sampler.</li>
+<li>Bug 47949 - JMS Subscriber never recieves all the messages</li>
 </ul>
 
 <h3>Controllers</h3>


Reply via email to