Author: rgreig
Date: Thu Apr  5 06:36:04 2007
New Revision: 525825

URL: http://svn.apache.org/viewvc?view=rev&rev=525825
Log:
Merged revisions 525531-525536 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2

........
  r525531 | rgreig | 2007-04-04 16:18:44 +0100 (Wed, 04 Apr 2007) | 1 line
  
  Added standard command line handline
........
  r525533 | rgreig | 2007-04-04 16:19:38 +0100 (Wed, 04 Apr 2007) | 1 line
  
  Added simeple file copy function.
........
  r525535 | rgreig | 2007-04-04 16:20:30 +0100 (Wed, 04 Apr 2007) | 1 line
  
  Added comments and logging to track down bug.
........
  r525536 | rgreig | 2007-04-04 16:21:43 +0100 (Wed, 04 Apr 2007) | 1 line
  
  Fixed dangling transaction problem by correctly binding queue.
........

Modified:
    incubator/qpid/trunk/qpid/   (props changed)
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=525825&r1=525824&r2=525825
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
 Thu Apr  5 06:36:04 2007
@@ -21,73 +21,241 @@
 package org.apache.qpid.server.store;
 
 import org.apache.commons.configuration.Configuration;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
 
+/**
+ * MessageStore defines the interface to a storage area, which can be used to 
preserve the state of messages, queues
+ * and exchanges in a transactional manner.
+ *
+ * <p/>All message store, remove, enqueue and dequeue operations are carried 
out against a [EMAIL PROTECTED] StoreContext} which
+ * encapsulates the transactional context they are performed in. Many such 
operations can be carried out in a single
+ * transaction.
+ *
+ * <p/>The storage and removal of queues and exchanges, are not carried out in 
a transactional context.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Accept transaction boundary demarcations: Begin, Commit, Abort.
+ * <tr><td> Store and remove queues.
+ * <tr><td> Store and remove exchanges.
+ * <tr><td> Store and remove messages.
+ * <tr><td> Bind and unbind queues to exchanges.
+ * <tr><td> Enqueue and dequeue messages to queues.
+ * <tr><td> Generate message identifiers.
+ * </table>
+ */
 public interface MessageStore
 {
     /**
      * Called after instantiation in order to configure the message store. A 
particular implementation can define
      * whatever parameters it wants.
-     * @param virtualHost the virtual host using by this store
-     * @param base the base element identifier from which all configuration 
items are relative. For example, if the base
-     * element is "store", the all elements used by concrete classes will be 
"store.foo" etc.
-     * @param config the apache commons configuration object
-     * @throws Exception if an error occurs that means the store is unable to 
configure itself
+     *
+     * @param virtualHost The virtual host using by this store
+     * @param base        The base element identifier from which all 
configuration items are relative. For example, if
+     *                    the base element is "store", the all elements used 
by concrete classes will be "store.foo" etc.
+     * @param config      The apache commons configuration object.
+     *
+     * @throws Exception If any error occurs that means the store is unable to 
configure itself.
      */
     void configure(VirtualHost virtualHost, String base, Configuration config) 
throws Exception;
 
     /**
      * Called to close and cleanup any resources used by the message store.
-     * @throws Exception if close fails
+     *
+     * @throws Exception If the close fails.
      */
     void close() throws Exception;
 
+    /**
+     * Removes the specified message from the store in the given transactional 
store context.
+     *
+     * @param storeContext The transactional context to remove the message in.
+     * @param messageId    Identifies the message to remove.
+     *
+     * @throws AMQException If the operation fails for any reason.
+     */
     void removeMessage(StoreContext storeContext, Long messageId) throws 
AMQException;
 
+    /**
+     * Makes the specified exchange persistent.
+     *
+     * @param exchange The exchange to persist.
+     *
+     * @throws AMQException If the operation fails for any reason.
+     */
     void createExchange(Exchange exchange) throws AMQException;
 
+    /**
+     * Removes the specified persistent exchange.
+     *
+     * @param exchange The exchange to remove.
+     *
+     * @throws AMQException If the operation fails for any reason.
+     */
     void removeExchange(Exchange exchange) throws AMQException;
 
+    /**
+     * Binds the specified queue to an exchange with a routing key.
+     *
+     * @param exchange   The exchange to bind to.
+     * @param routingKey The routing key to bind by.
+     * @param queue      The queue to bind.
+     * @param args       Additional parameters.
+     *
+     * @throws AMQException If the operation fails for any reason.
+     */
     void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue 
queue, FieldTable args) throws AMQException;
 
+    /**
+     * Unbinds the specified from an exchange under a particular routing key.
+     *
+     * @param exchange   The exchange to unbind from.
+     * @param routingKey The routing key to unbind.
+     * @param queue      The queue to unbind.
+     * @param args       Additonal parameters.
+     *
+     * @throws AMQException If the operation fails for any reason.
+     */
     void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue 
queue, FieldTable args) throws AMQException;
 
-
+    /**
+     * Makes the specified queue persistent.
+     *
+     * @param queue The queue to store.
+     *
+     * @throws AMQException If the operation fails for any reason.
+     */
     void createQueue(AMQQueue queue) throws AMQException;
 
+    /**
+     * Removes the specified queue from the persistent store.
+     *
+     * @param name The queue to remove.
+     *
+     * @throws AMQException If the operation fails for any reason.
+     */
     void removeQueue(AMQShortString name) throws AMQException;
 
+    /**
+     * Places a message onto a specified queue, in a given transactional 
context.
+     *
+     * @param context   The transactional context for the operation.
+     * @param name      The name of the queue to place the message on.
+     * @param messageId The message to enqueue.
+     *
+     * @throws AMQException If the operation fails for any reason.
+     */
     void enqueueMessage(StoreContext context, AMQShortString name, Long 
messageId) throws AMQException;
 
+    /**
+     * Extracts a message from a specified queue, in a given transactional 
context.
+     *
+     * @param context   The transactional context for the operation.
+     * @param name      The name of the queue to take the message from.
+     * @param messageId The message to dequeue.
+     *
+     * @throws AMQException If the operation fails for any reason, or if the 
specified message does not exist.
+     */
     void dequeueMessage(StoreContext context, AMQShortString name, Long 
messageId) throws AMQException;
 
+    /**
+     * Begins a transactional context.
+     *
+     * @param context The transactional context to begin.
+     *
+     * @throws AMQException If the operation fails for any reason.
+     */
     void beginTran(StoreContext context) throws AMQException;
 
+    /**
+     * Commits all operations performed within a given transactional context.
+     *
+     * @param context The transactional context to commit all operations for.
+     *
+     * @throws AMQException If the operation fails for any reason.
+     */
     void commitTran(StoreContext context) throws AMQException;
 
+    /**
+     * Abandons all operations performed within a given transactional context.
+     *
+     * @param context The transactional context to abandon.
+     *
+     * @throws AMQException If the operation fails for any reason.
+     */
     void abortTran(StoreContext context) throws AMQException;
 
+    /**
+     * Tests a transactional context to see if it has been begun but not yet 
committed or aborted.
+     *
+     * @param context The transactional context to test.
+     *
+     * @return <tt>true</tt> if the transactional context is live, 
<tt>false</tt> otherwise.
+     */
     boolean inTran(StoreContext context);
 
     /**
      * Return a valid, currently unused message id.
-     * @return a message id
+     *
+     * @return A fresh message id.
      */
     Long getNewMessageId();
 
-    void storeContentBodyChunk(StoreContext context, Long messageId, int 
index, ContentChunk contentBody, boolean lastContentBody) throws AMQException;
+    /**
+     * Stores a chunk of message data.
+     *
+     * @param context         The transactional context for the operation.
+     * @param messageId       The message to store the data for.
+     * @param index           The index of the data chunk.
+     * @param contentBody     The content of the data chunk.
+     * @param lastContentBody Flag to indicate that this is the last such 
chunk for the message.
+     *
+     * @throws AMQException If the operation fails for any reason, or if the 
specified message does not exist.
+     */
+    void storeContentBodyChunk(StoreContext context, Long messageId, int 
index, ContentChunk contentBody,
+        boolean lastContentBody) throws AMQException;
 
+    /**
+     * Stores message meta-data.
+     *
+     * @param context         The transactional context for the operation.
+     * @param messageId       The message to store the data for.
+     * @param messageMetaData The message meta data to store.
+     *
+     * @throws AMQException If the operation fails for any reason, or if the 
specified message does not exist.
+     */
     void storeMessageMetaData(StoreContext context, Long messageId, 
MessageMetaData messageMetaData) throws AMQException;
 
+    /**
+     * Retrieves message meta-data.
+     *
+     * @param context   The transactional context for the operation.
+     * @param messageId The message to get the meta-data for.
+     *
+     * @return The message meta data.
+     *
+     * @throws AMQException If the operation fails for any reason, or if the 
specified message does not exist.
+     */
     MessageMetaData getMessageMetaData(StoreContext context, Long messageId) 
throws AMQException;
 
+    /**
+     * Retrieves a chunk of message data.
+     *
+     * @param context   The transactional context for the operation.
+     * @param messageId The message to get the data chunk for.
+     * @param index     The offset index of the data chunk within the message.
+     *
+     * @return A chunk of message data.
+     *
+     * @throws AMQException If the operation fails for any reason, or if the 
specified message does not exist.
+     */
     ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int 
index) throws AMQException;
-
 }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?view=diff&rev=525825&r1=525824&r2=525825
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
 Thu Apr  5 06:36:04 2007
@@ -22,16 +22,14 @@
 
 import org.apache.log4j.Logger;
 
-
 /**
  * A context that the store can use to associate with a transactional context. 
For example, it could store
  * some kind of txn id.
- * 
+ *
  * @author Apache Software Foundation
  */
 public class StoreContext
 {
-
     private static final Logger _logger = Logger.getLogger(StoreContext.class);
 
     private String _name;
@@ -54,7 +52,17 @@
 
     public void setPayload(Object payload)
     {
-        _logger.debug("["+_name+"] Setting payload: " + payload);
+        _logger.debug("public void setPayload(Object payload = " + payload + 
"): called");
         _payload = payload;
+    }
+
+    /**
+     * Prints out the transactional context as a string, mainly for debugging 
purposes.
+     *
+     * @return The transactional context as a string.
+     */
+    public String toString()
+    {
+        return "<_name = " + _name + ", _payload = " + _payload + ">";
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java?view=diff&rev=525825&r1=525824&r2=525825
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java
 Thu Apr  5 06:36:04 2007
@@ -143,8 +143,8 @@
             String[] nextOptionSpec = config[i];
 
             addOption(nextOptionSpec[0], nextOptionSpec[1], 
(nextOptionSpec.length > 2) ? nextOptionSpec[2] : null,
-                      (nextOptionSpec.length > 3) ? 
("true".equals(nextOptionSpec[3]) ? true : false) : false,
-                      (nextOptionSpec.length > 4) ? nextOptionSpec[4] : null);
+                (nextOptionSpec.length > 3) ? 
("true".equals(nextOptionSpec[3]) ? true : false) : false,
+                (nextOptionSpec.length > 4) ? nextOptionSpec[4] : null);
         }
     }
 
@@ -209,8 +209,9 @@
         // Print usage on each of the command line options.
         for (CommandLineOption optionInfo : optionMap.values())
         {
-            result += optionInfo.option + " " + ((optionInfo.argument != null) 
? (optionInfo.argument + " ") : "")
-                      + optionInfo.comment + "\n";
+            result +=
+                optionInfo.option + " " + ((optionInfo.argument != null) ? 
(optionInfo.argument + " ") : "")
+                + optionInfo.comment + "\n";
         }
 
         return result;
@@ -604,6 +605,37 @@
     }
 
     /**
+     * Extracts all name=value pairs from the command line, sets them all as 
system properties and also returns
+     * a map of properties containing them.
+     *
+     * @param args The command line.
+     *
+     * @return A set of properties containing all name=value pairs from the 
command line.
+     */
+    public static Properties processCommandLine(String[] args, 
CommandLineParser commandLine)
+    {
+        // Capture the command line arguments or display errors and correct 
usage and then exit.
+        Properties options = null;
+
+        try
+        {
+            options = commandLine.parseCommandLine(args);
+
+            // Add all the trailing command line options (name=value pairs) to 
system properties. They may be picked up
+            // from there.
+            commandLine.addCommandLineToSysProperties();
+        }
+        catch (IllegalArgumentException e)
+        {
+            System.out.println(commandLine.getErrors());
+            System.out.println(commandLine.getUsage());
+            System.exit(1);
+        }
+
+        return options;
+    }
+
+    /**
      * Holds information about a command line options. This includes what its 
name is, whether or not it is a flag,
      * whether or not it is mandatory, what its user comment is, what its 
argument reminder text is and what its
      * regular expression format is.
@@ -646,7 +678,7 @@
          * @param formatRegexp The regular expression that the argument to 
this option must meet to be valid.
          */
         public CommandLineOption(String option, boolean expectsArgs, String 
comment, String argument, boolean mandatory,
-                                 String formatRegexp)
+            String formatRegexp)
         {
             this.option = option;
             this.expectsArgs = expectsArgs;

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java?view=diff&rev=525825&r1=525824&r2=525825
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/FileUtils.java
 Thu Apr  5 06:36:04 2007
@@ -158,4 +158,40 @@
 
         return is;
     }
+
+    /**
+     * Copies the specified source file to the specified destintaion file. If 
the destinationst file does not exist,
+     * it is created.
+     *
+     * @param src The source file name.
+     * @param dst The destination file name.
+     */
+    public static void copy(File src, File dst)
+    {
+        try
+        {
+            InputStream in = new FileInputStream(src);
+            if (!dst.exists())
+            {
+                dst.createNewFile();
+            }
+
+            OutputStream out = new FileOutputStream(dst);
+
+            // Transfer bytes from in to out
+            byte[] buf = new byte[1024];
+            int len;
+            while ((len = in.read(buf)) > 0)
+            {
+                out.write(buf, 0, len);
+            }
+
+            in.close();
+            out.close();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
 }

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?view=diff&rev=525825&r1=525824&r2=525825
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
 Thu Apr  5 06:36:04 2007
@@ -35,6 +35,7 @@
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.requestreply.PingPongProducer;
+import org.apache.qpid.util.CommandLineParser;
 
 import uk.co.thebadgerset.junit.extensions.util.MathUtils;
 import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
@@ -71,6 +72,7 @@
  * <tr><td> uniqueDests      <td> false    <td> Prevents destination names 
being timestamped.
  * <tr><td> transacted       <td> true     <td> Only makes sense to test with 
transactions.
  * <tr><td> persistent       <td> true     <td> Only makes sense to test 
persistent.
+ * <tr><td> durableDests     <td> true     <td> Should use durable queues with 
persistent messages.
  * <tr><td> commitBatchSize  <td> 10
  * <tr><td> rate             <td> 20       <td> Total default test time is 5 
seconds.
  * </table>
@@ -108,6 +110,7 @@
         defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
         defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
         defaults.setProperty(RATE_PROPNAME, "20");
+        defaults.setProperty(DURABLE_DESTS_PROPNAME, "true");
     }
 
     /** Specifies the number of pings to send, if larger than 0. 0 means send 
until told to stop. */
@@ -150,7 +153,7 @@
         try
         {
             // Create a ping producer overriding its defaults with all options 
passed on the command line.
-            Properties options = processCommandLine(args);
+            Properties options = CommandLineParser.processCommandLine(args, 
new CommandLineParser(new String[][] {}));
             PingDurableClient pingProducer = new PingDurableClient(options);
 
             // Create a shutdown hook to terminate the ping-pong producer.

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?view=diff&rev=525825&r1=525824&r2=525825
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
 Thu Apr  5 06:36:04 2007
@@ -35,13 +35,10 @@
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.*;
 import org.apache.qpid.client.message.TestMessageFactory;
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.jms.MessageProducer;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.url.URLSyntaxException;
@@ -90,6 +87,7 @@
  * <tr><td> timeout          <td> 30000    <td> In milliseconds. The timeout 
to stop waiting for replies.
  * <tr><td> commitBatchSize  <td> 1        <td> The number of messages per 
transaction in transactional mode.
  * <tr><td> uniqueDests      <td> true     <td> Whether each receiver only 
listens to one ping destination or all.
+ * <tr><td> durableDests     <td> false    <td> Whether or not durable 
destinations are used.
  * <tr><td> ackMode          <td> AUTO_ACK <td> The message acknowledgement 
mode. Possible values are:
  *                                               0 - SESSION_TRANSACTED
  *                                               1 - AUTO_ACKNOWLEDGE
@@ -257,6 +255,9 @@
     /** Defines the default value for the unique destinations property. */
     public static final boolean UNIQUE_DESTS_DEFAULT = true;
 
+    public static final String DURABLE_DESTS_PROPNAME = "durableDests";
+    public static final boolean DURABLE_DESTS_DEFAULT = false;
+
     /** Holds the name of the proeprty to get the message acknowledgement mode 
from. */
     public static final String ACK_MODE_PROPNAME = "ackMode";
 
@@ -299,6 +300,7 @@
         defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
         defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
         defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, 
UNIQUE_DESTS_DEFAULT);
+        defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, 
DURABLE_DESTS_DEFAULT);
         defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, 
FAIL_BEFORE_COMMIT_DEFAULT);
         defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, 
FAIL_AFTER_COMMIT_DEFAULT);
         defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, 
FAIL_BEFORE_SEND_DEFAULT);
@@ -337,6 +339,9 @@
     /** Flag used to indicate if the destinations should be unique client. */
     protected boolean _isUnique;
 
+    /** Flag used to indicate that durable destination should be used. */
+    protected boolean _isDurable;
+
     /** Flag used to indicate that the user should be prompted to terminate a 
broker, to test failover before a commit. */
     protected boolean _failBeforeCommit;
 
@@ -424,6 +429,7 @@
 
     /** The prompt to display when asking the user to kill the broker for 
failover testing. */
     private static final String KILL_BROKER_PROMPT = "Kill broker now, then 
press Return.";
+    private String _clientID;
 
     /**
      * Creates a ping producer with the specified parameters, of which there 
are many. See the class level comments
@@ -463,6 +469,7 @@
         _rate = properties.getPropertyAsInteger(RATE_PROPNAME);
         _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
         _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
+        _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
         _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
         _pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME);
 
@@ -498,10 +505,10 @@
 
         // Generate a unique identifying name for this client, based on it ip 
address and the current time.
         InetAddress address = InetAddress.getLocalHost();
-        String clientID = address.getHostName() + System.currentTimeMillis();
+        _clientID = address.getHostName() + System.currentTimeMillis();
 
         // Create a connection to the broker.
-        createConnection(clientID);
+        createConnection(_clientID);
 
         // Create transactional or non-transactional sessions, based on the 
command line arguments.
         _producerSession = (Session) 
getConnection().createSession(_transacted, _ackMode);
@@ -509,7 +516,7 @@
 
         // Create the destinations to send pings to and receive replies from.
         _replyDestination = _consumerSession.createTemporaryQueue();
-        createPingDestinations(_noOfDestinations, _selector, _destinationName, 
_isUnique);
+        createPingDestinations(_noOfDestinations, _selector, _destinationName, 
_isUnique, _isDurable);
 
         // Create the message producer only if instructed to.
         if (producer)
@@ -548,7 +555,7 @@
     {
         try
         {
-            Properties options = processCommandLine(args);
+            Properties options = CommandLineParser.processCommandLine(args, 
new CommandLineParser(new String[][] {}));
 
             // Create a ping producer overriding its defaults with all options 
passed on the command line.
             PingPongProducer pingProducer = new PingPongProducer(options);
@@ -577,43 +584,6 @@
     }
 
     /**
-     * Extracts all name=value pairs from the command line, sets them all as 
system properties and also returns
-     * a map of properties containing them.
-     *
-     * @param args The command line.
-     *
-     * @return A set of properties containing all name=value pairs from the 
command line.
-     *
-     * @todo This is a commonly used piece of code. Make it accept a command 
line definition and move it into the
-     *       CommandLineParser class.
-     */
-    protected static Properties processCommandLine(String[] args)
-    {
-        // Use the command line parser to evaluate the command line.
-        CommandLineParser commandLine = new CommandLineParser(new String[][] 
{});
-
-        // Capture the command line arguments or display errors and correct 
usage and then exit.
-        Properties options = null;
-
-        try
-        {
-            options = commandLine.parseCommandLine(args);
-
-            // Add all the trailing command line options (name=value pairs) to 
system properties. Tests may pick up
-            // overridden values from there.
-            commandLine.addCommandLineToSysProperties();
-        }
-        catch (IllegalArgumentException e)
-        {
-            System.out.println(commandLine.getErrors());
-            System.out.println(commandLine.getUsage());
-            System.exit(1);
-        }
-
-        return options;
-    }
-
-    /**
      * Convenience method for a short pause.
      *
      * @param sleepTime The time in milliseconds to pause for.
@@ -677,11 +647,12 @@
      *
      * @throws JMSException Any JMSExceptions are allowed to fall through.
      */
-    public void createPingDestinations(int noOfDestinations, String selector, 
String rootName, boolean unique)
-        throws JMSException
+    public void createPingDestinations(int noOfDestinations, String selector, 
String rootName, boolean unique,
+        boolean durable) throws JMSException, AMQException
     {
         log.debug("public void createPingDestinations(int noOfDestinations = " 
+ noOfDestinations + ", String selector = "
-            + selector + ", String rootName = " + rootName + ", boolean unique 
= " + unique + "): called");
+            + selector + ", String rootName = " + rootName + ", boolean unique 
= " + unique + ", boolean durable = "
+            + durable + "): called");
 
         _pingDestinations = new ArrayList<Destination>();
 
@@ -709,13 +680,30 @@
             // Check if this is a pub/sub pinger, in which case create topics.
             if (_isPubSub)
             {
-                destination = new 
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
-                log.debug("Created topic " + destination);
+                if (!durable)
+                {
+                    destination = new 
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
+                    log.debug("Created non-durable topic " + destination);
+                }
+                else
+                {
+                    destination =
+                        AMQTopic.createDurableTopic(new 
AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
+                            _clientID, (AMQConnection) _connection);
+                    log.debug("Created durable topic " + destination);
+                }
             }
             // Otherwise this is a p2p pinger, in which case create queues.
             else
             {
-                destination = new 
AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
+                AMQShortString destinationName = new AMQShortString(rootName + 
id);
+                destination =
+                    new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, 
destinationName, destinationName, false, false,
+                        _isDurable);
+                ((AMQSession) _producerSession).createQueue(destinationName, 
false, _isDurable, false);
+                ((AMQSession) _producerSession).bindQueue(destinationName, 
destinationName, null,
+                    ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+
                 log.debug("Created queue " + destination);
             }
 


Reply via email to