Author: sebb
Date: Fri Jun 25 00:20:40 2010
New Revision: 957767

URL: http://svn.apache.org/viewvc?rev=957767&view=rev
Log:
Merge OnMessageSubscriber and onMessage method/queue with ReceiveSubscriber
No longer use ClientPool (did not share anything)
Tidy SubscriberSampler now sample code is identical for both strategies

Removed:
    
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/OnMessageSubscriber.java
Modified:
    
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
    
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java

Modified: 
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
URL: 
http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java?rev=957767&r1=957766&r2=957767&view=diff
==============================================================================
--- 
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
 (original)
+++ 
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/client/ReceiveSubscriber.java
 Fri Jun 25 00:20:40 2010
@@ -19,12 +19,15 @@
 package org.apache.jmeter.protocol.jms.client;
 
 import java.io.Closeable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.Session;
 import javax.naming.Context;
 import javax.naming.NamingException;
@@ -34,10 +37,15 @@ import org.apache.jorphan.logging.Loggin
 import org.apache.log.Logger;
 
 /**
- * Uses MessageConsumer.receive(timeout) to fetch messages.
- * Does not cache any messages.
+ * Generic MessageConsumer class, which has two possible strategies.
+ * <ul>
+ * <li>Use MessageConsumer.receive(timeout) to fetch messages.</li>
+ * <li>Use MessageListener.onMessage() to cache messages in a local queue.</li>
+ * </ul>
+ * In both cases, the {...@link #getMessage(long)} method is used to return 
the next message,
+ * either directly using receive(timeout) or from the queue using 
poll(timeout).
  */
-public class ReceiveSubscriber implements Closeable {
+public class ReceiveSubscriber implements Closeable, MessageListener {
 
     private static final Logger log = LoggingManager.getLoggerForClass();
 
@@ -47,6 +55,11 @@ public class ReceiveSubscriber implement
 
     private final MessageConsumer SUBSCRIBER;
 
+    /*
+     * We use a LinkedBlockingQueue (rather than a ConcurrentLinkedQueue) 
because it has a
+     * poll-with-wait method that avoids the need to use a polling loop.
+     */
+    private final LinkedBlockingQueue<Message> queue;
 
     /**
      * Constructor takes the necessary JNDI related parameters to create a
@@ -76,6 +89,45 @@ public class ReceiveSubscriber implement
         SESSION = CONN.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Destination dest = Utils.lookupDestination(ctx, destinationName);
         SUBSCRIBER = SESSION.createConsumer(dest);
+        queue = null;
+        log.debug("<init> complete");
+    }
+
+    /**
+     * Constructor takes the necessary JNDI related parameters to create a
+     * connection and create an onMessageListener to prepare to begin 
receiving messages.
+     * <br/>
+     * The caller must then invoke {...@link #start()} to enable message 
reception.
+     * 
+     * @param queueSize maximum queue size <=0 == no limit
+     * @param useProps if true, use jndi.properties instead of 
+     * initialContextFactory, providerUrl, securityPrincipal, 
securityCredentials
+     * @param initialContextFactory
+     * @param providerUrl
+     * @param connfactory
+     * @param destinationName
+     * @param useAuth
+     * @param securityPrincipal
+     * @param securityCredentials
+     * @throws JMSException if could not create context or other problem 
occurred.
+     * @throws NamingException 
+     */
+    public ReceiveSubscriber(int queueSize, boolean useProps, 
+            String initialContextFactory, String providerUrl, String 
connfactory, String destinationName,
+            boolean useAuth, 
+            String securityPrincipal, String securityCredentials) throws 
NamingException, JMSException {
+        Context ctx = InitialContextFactory.getContext(useProps, 
+                initialContextFactory, providerUrl, useAuth, 
securityPrincipal, securityCredentials);
+        CONN = Utils.getConnection(ctx, connfactory);
+        SESSION = CONN.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination dest = Utils.lookupDestination(ctx, destinationName);
+        SUBSCRIBER = SESSION.createConsumer(dest);
+        if (queueSize <=0) {
+            queue = new LinkedBlockingQueue<Message>();
+        } else {
+            queue = new LinkedBlockingQueue<Message>(queueSize);            
+        }
+        SUBSCRIBER.setMessageListener(this);
         log.debug("<init> complete");
     }
 
@@ -108,6 +160,18 @@ public class ReceiveSubscriber implement
      */
     public Message getMessage(long timeout) throws JMSException {
         Message message = null;
+        if (queue != null) { // Using onMessage Listener
+            try {
+                if (timeout < 10) { // Allow for short/negative times
+                    message = queue.poll();                    
+                } else {
+                    message = queue.poll(timeout, TimeUnit.MILLISECONDS);
+                }
+            } catch (InterruptedException e) {
+                // Ignored
+            }
+            return message;
+        }
         if (timeout < 10) { // Allow for short/negative times
             message = SUBSCRIBER.receiveNoWait();                
         } else {
@@ -116,10 +180,10 @@ public class ReceiveSubscriber implement
         return message;
     }
     /**
-     * close() will stop the connection first. Then it closes the subscriber,
-     * session and connection.
+     * close() will stop the connection first. 
+     * Then it closes the subscriber, session and connection.
      */
-    public synchronized void close() { // called from testEnded() thread
+    public void close() { // called from threadFinished() thread
         log.debug("close()");
         try {
             CONN.stop();
@@ -130,4 +194,14 @@ public class ReceiveSubscriber implement
         Utils.close(SESSION, log);
         Utils.close(CONN, log);
     }
+
+
+    /**
+     * {...@inheritdoc}
+     */
+    public void onMessage(Message message) {
+        if (!queue.offer(message)){
+            log.warn("Could not add message to queue");
+        }
+    }
 }

Modified: 
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
URL: 
http://svn.apache.org/viewvc/jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java?rev=957767&r1=957766&r2=957767&view=diff
==============================================================================
--- 
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
 (original)
+++ 
jakarta/jmeter/trunk/src/protocol/jms/org/apache/jmeter/protocol/jms/sampler/SubscriberSampler.java
 Fri Jun 25 00:20:40 2010
@@ -18,36 +18,37 @@
 package org.apache.jmeter.protocol.jms.sampler;
 
 import java.util.Enumeration;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.Message;
-import javax.jms.MessageListener;
 import javax.jms.TextMessage;
 import javax.naming.NamingException;
 
+import org.apache.jmeter.protocol.jms.Utils;
+import org.apache.jmeter.protocol.jms.client.ReceiveSubscriber;
+import org.apache.jmeter.protocol.jms.control.gui.JMSSubscriberGui;
 import org.apache.jmeter.samplers.Interruptible;
 import org.apache.jmeter.samplers.SampleResult;
-import org.apache.jmeter.testelement.TestListener;
 import org.apache.jmeter.testelement.ThreadListener;
 import org.apache.jmeter.util.JMeterUtils;
-import org.apache.jmeter.engine.event.LoopIterationEvent;
-
-import org.apache.jmeter.protocol.jms.Utils;
-import org.apache.jmeter.protocol.jms.control.gui.JMSSubscriberGui;
-import org.apache.jmeter.protocol.jms.client.ClientPool;
-import org.apache.jmeter.protocol.jms.client.OnMessageSubscriber;
-import org.apache.jmeter.protocol.jms.client.ReceiveSubscriber;
-
 import org.apache.jorphan.logging.LoggingManager;
 import org.apache.log.Logger;
 
 /**
- * This class implements the JMS Subcriber sampler
+ * This class implements the JMS Subcriber sampler.
+ * It supports both receive and onMessage strategies via the ReceiveSubscriber 
class.
+ * 
  */
-public class SubscriberSampler extends BaseJMSSampler implements 
Interruptible, TestListener, ThreadListener, MessageListener {
+// TODO: do we need to implement any kind of connection pooling?
+// If so, which connections should be shared?
+// Should threads share connections to the same destination?
+// What about cross-thread sharing?
+
+// Note: originally the code did use the ClientPool to "share" subscribers, 
however since the
+// key was "this" and each sampler is unique - nothing was actually shared.
+
+public class SubscriberSampler extends BaseJMSSampler implements 
Interruptible, ThreadListener {
 
     private static final long serialVersionUID = 240L;
 
@@ -60,12 +61,6 @@ public class SubscriberSampler extends B
     // No need to synch/ - only used by sampler and ClientPool (which does its 
own synch)
     private transient ReceiveSubscriber SUBSCRIBER = null;
 
-    /*
-     * We use a LinkedBlockingQueue (rather than a ConcurrentLinkedQueue) 
because it has a
-     * poll-with-wait method that avoids the need to use a polling loop.
-     */
-    private transient LinkedBlockingQueue<Message> queue;
-    
     private transient volatile boolean interrupted = false;
 
     private transient long timeout;
@@ -83,35 +78,6 @@ public class SubscriberSampler extends B
     public SubscriberSampler() {
     }
 
-    public void testEnded(String test) {
-        testEnded();
-    }
-
-    public void testStarted(String test) {
-        testStarted();
-    }
-
-    /**
-     * testEnded is called by Jmeter's engine.
-     * Clears the client pool.
-     */
-    public void testEnded() {
-        log.debug("SubscriberSampler.testEnded called");
-        ClientPool.clearClient();
-    }
-
-    /**
-     * {...@inheritdoc}
-     */
-    public void testStarted() {
-    }
-
-    /**
-     * {...@inheritdoc}
-     */
-    public void testIterationStart(LoopIterationEvent event) {
-    }
-
     /**
      * Create the OnMessageSubscriber client and set the sampler as the message
      * listener.
@@ -120,19 +86,10 @@ public class SubscriberSampler extends B
      *
      */
     private void initListenerClient() throws JMSException, NamingException {
-        OnMessageSubscriber sub = (OnMessageSubscriber) ClientPool.get(this);
-        if (sub == null) {
-            sub = new OnMessageSubscriber(getUseJNDIPropertiesAsBoolean(), 
getJNDIInitialContextFactory(),
+        SUBSCRIBER = new ReceiveSubscriber(0, getUseJNDIPropertiesAsBoolean(), 
getJNDIInitialContextFactory(),
                     getProviderUrl(), getConnectionFactory(), 
getDestination(), 
                     isUseAuth(), getUsername(), getPassword());
-            queue = new LinkedBlockingQueue<Message>();
-            sub.setMessageListener(this);
-            sub.start();
-            ClientPool.addClient(sub);
-            ClientPool.put(this, sub);
-            log.debug("SubscriberSampler.initListenerClient called");
-            log.debug("loop count " + getIterations());
-        }
+        log.debug("SubscriberSampler.initListenerClient called");
     }
 
     /**
@@ -144,8 +101,6 @@ public class SubscriberSampler extends B
         SUBSCRIBER = new ReceiveSubscriber(getUseJNDIPropertiesAsBoolean(),
                 getJNDIInitialContextFactory(), getProviderUrl(), 
getConnectionFactory(), getDestination(),
                 isUseAuth(), getUsername(), getPassword());
-        SUBSCRIBER.start();
-        ClientPool.addClient(SUBSCRIBER);
         log.debug("SubscriberSampler.initReceiveClient called");
     }
 
@@ -155,6 +110,7 @@ public class SubscriberSampler extends B
      *
      * @return the appropriate sample result
      */
+    // TODO - should we call start() and stop()?
     @Override
     public SampleResult sample() {
         SampleResult result = new SampleResult();
@@ -168,82 +124,12 @@ public class SubscriberSampler extends B
             result.setResponseMessage(exceptionDuringInit.toString());
             return result; 
         }
-        if (useReceive) {
-            return sampleWithReceive(result);
-        } else {
-            return sampleWithListener(result);
-        }
-    }
-
-    /**
-     * sample will block until messages are received
-     * @param result 
-     *
-     * @return the sample result
-     */
-    private SampleResult sampleWithListener(SampleResult result) {
-        StringBuilder buffer = new StringBuilder();
-        StringBuilder propBuffer = new StringBuilder();
-
-        int loop = getIterationCount();
-        int read = 0;
-
-        long until = 0L;
-        long now = System.currentTimeMillis();
-        if (timeout > 0) {
-            until = timeout + now; 
-        }
-        while (!interrupted
-                && (until == 0 || now < until)
-                && read < loop) {
-            try {
-                Message msg = queue.poll(calculateWait(until, now), 
TimeUnit.MILLISECONDS);
-                if (msg != null) {
-                    read++;
-                    extractContent(buffer, propBuffer, msg);
-                }
-            } catch (InterruptedException e) {
-                // Ignored
-            }
-            now = System.currentTimeMillis();
-        }
-        result.sampleEnd();
-       
-        if (getReadResponseAsBoolean()) {
-            result.setResponseData(buffer.toString().getBytes());
-        } else {
-            result.setBytes(buffer.toString().getBytes().length);
-        }
-        result.setResponseHeaders(propBuffer.toString());
-        result.setDataType(SampleResult.TEXT);
-        if (read == 0) {
-            result.setResponseCode("404"); // Not found
-            result.setSuccessful(false);
-        } else { // TODO set different status if not enough messages found?
-            result.setResponseCodeOK();
-            result.setSuccessful(true);
-        }
-        result.setResponseMessage(read + " messages received");
-        result.setSamplerData(loop + " messages expected");
-        result.setSampleCount(read);
-
-        return result;
-    }
-
-    /**
-     * Sample method uses the ReceiveSubscriber client instead of onMessage
-     * approach.
-     * @param result 
-     *
-     * @return the sample result
-     */
-    private SampleResult sampleWithReceive(SampleResult result) {
         StringBuilder buffer = new StringBuilder();
         StringBuilder propBuffer = new StringBuilder();
         
         int loop = getIterationCount();
         int read = 0;
-
+        
         long until = 0L;
         long now = System.currentTimeMillis();
         if (timeout > 0) {
@@ -282,7 +168,7 @@ public class SubscriberSampler extends B
         result.setResponseMessage(read + " message(s) received successfully");
         result.setSamplerData(loop + " messages expected");
         result.setSampleCount(read);
-
+        
         return result;
     }
 
@@ -328,13 +214,55 @@ public class SubscriberSampler extends B
     }
 
     /**
-     * The sampler implements MessageListener directly and sets itself as the
-     * listener with the MessageConsumer.
+     * Initialise the thread-local variables.
+     * <br/>
+     * {...@inheritdoc}
      */
-    public void onMessage(Message message) {
-        if (!queue.offer(message)){
-            log.warn("Could not add message to queue");
+    public void threadStarted() {
+        timeout = getTimeoutAsLong();
+        interrupted = false;
+        exceptionDuringInit = null;
+        useReceive = getClientChoice().equals(JMSSubscriberGui.RECEIVE_RSC);
+        if (useReceive) {
+            try {
+                initReceiveClient();
+                SUBSCRIBER.start();
+            } catch (NamingException e) {
+                exceptionDuringInit = e;
+            } catch (JMSException e) {
+                exceptionDuringInit = e;
+            }
+        } else {
+            try {
+                initListenerClient();
+                SUBSCRIBER.start();
+            } catch (JMSException e) {
+                exceptionDuringInit = e;
+            } catch (NamingException e) {
+                exceptionDuringInit = e;
+            }
         }
+        if (exceptionDuringInit != null){
+            log.error("Could not initialise client",exceptionDuringInit);
+        }
+    }
+
+    /**
+     * Close subscriber.
+     * <br/>
+     * {...@inheritdoc}
+     */
+    public void threadFinished() {
+        SUBSCRIBER.close();
+    }
+
+    /**
+     * Handle an interrupt of the test.
+     */
+    public boolean interrupt() {
+        boolean oldvalue = interrupted;
+        interrupted = true;   // so we break the loops in SampleWithListener 
and SampleWithReceive
+        return !oldvalue;
     }
 
     // ----------- get/set methods ------------------- //
@@ -374,45 +302,7 @@ public class SubscriberSampler extends B
         setProperty(TIMEOUT, timeout, TIMEOUT_DEFAULT);        
     }
 
-    /**
-     * Handle an interrupt of the test.
-     */
-    public boolean interrupt() {
-        boolean oldvalue = interrupted;
-        interrupted = true;   // so we break the loops in SampleWithListener 
and SampleWithReceive
-        return !oldvalue;
-    }
-
     // This was the old value that was checked for
     private final static String RECEIVE_STR = 
JMeterUtils.getResString(JMSSubscriberGui.RECEIVE_RSC); // $NON-NLS-1$
 
-    public void threadFinished() {
-    }
-
-    public void threadStarted() {
-        timeout = getTimeoutAsLong();
-        interrupted = false;
-        exceptionDuringInit = null;
-        useReceive = getClientChoice().equals(JMSSubscriberGui.RECEIVE_RSC);
-        if (useReceive) {
-            try {
-                initReceiveClient();
-            } catch (NamingException e) {
-                exceptionDuringInit = e;
-            } catch (JMSException e) {
-                exceptionDuringInit = e;
-            }
-        } else {
-            try {
-                initListenerClient();
-            } catch (JMSException e) {
-                exceptionDuringInit = e;
-            } catch (NamingException e) {
-                exceptionDuringInit = e;
-            }
-        }
-        if (exceptionDuringInit != null){
-            log.error("Could not initialise client",exceptionDuringInit);
-        }
-    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to