Author: bhupendrab
Date: Mon Jan 15 01:39:38 2007
New Revision: 496260

URL: http://svn.apache.org/viewvc?view=rev&rev=496260
Log:
checking for the AMQ MessageID of received message in 
ServiceRequestingClient.java

Modified:
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
    
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
    incubator/qpid/trunk/qpid/java/perftests/src/main/java/perftests.log4j

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java?view=diff&rev=496260&r1=496259&r2=496260
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
 Mon Jan 15 01:39:38 2007
@@ -35,7 +35,7 @@
 public class ServiceProvidingClient
 {
     private static final Logger _logger = 
Logger.getLogger(ServiceProvidingClient.class);
-    private static final String MESSAGE_IDENTIFIER = "MessageIdentifier";
+
     private MessageProducer _destinationProducer;
 
     private Destination _responseDest;
@@ -57,8 +57,7 @@
         _logger.info("Delivery Mode: " + (deliveryMode == 
DeliveryMode.NON_PERSISTENT ? "Non Persistent" : "Persistent")
                      + "\t isTransactional: " + _isTransactional);
 
-        _connection = new AMQConnection(brokerDetails, username, password,
-                                        clientName, virtualPath);
+        _connection = new AMQConnection(brokerDetails, username, password, 
clientName, virtualPath);
         _connection.setConnectionListener(new ConnectionListener()
         {
 
@@ -145,11 +144,6 @@
                         _logger.info("timeSent value is: " + timesent);
                         msg.setLongProperty("timeSent", timesent);
                     }
-                    // this identifier set in the serviceRequestingClient is 
used to match the response with the request
-                    if (tm.propertyExists(MESSAGE_IDENTIFIER))
-                    {
-                        msg.setIntProperty(MESSAGE_IDENTIFIER, 
tm.getIntProperty(MESSAGE_IDENTIFIER));
-                    }
                     
                     _destinationProducer.send(msg);
 
@@ -200,7 +194,6 @@
             _logger.error("Error: " + e, e);
         }
 
-
         int deliveryMode = DeliveryMode.NON_PERSISTENT;
         boolean transactedMode = false;
 
@@ -237,9 +230,6 @@
         {
             _logger.error("Error: " + e, e);
         }
-
-
     }
-
 }
 

Modified: 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java?view=diff&rev=496260&r1=496259&r2=496260
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
 Mon Jan 15 01:39:38 2007
@@ -26,6 +26,7 @@
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.client.message.JMSTextMessage;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.MessageProducer;
 import org.apache.qpid.jms.Session;
@@ -47,8 +48,7 @@
 {
     private static final Logger _log = 
Logger.getLogger(ServiceRequestingClient.class);
 
-    private static final String MESSAGE_IDENTIFIER = "MessageIdentifier";
-    private static int _messageIdentifier = 0;
+    private long _messageIdentifier = 0;
     private String MESSAGE_DATA;
 
     private AMQConnection _connection;
@@ -108,10 +108,6 @@
                         _log.info("Average latency now: " + _averageLatency);
                     }
                 }
-                if (m.propertyExists(MESSAGE_IDENTIFIER))
-                {
-                    _log.info("Received Message Identifier: " + 
m.getIntProperty(MESSAGE_IDENTIFIER));
-                }
                 if(_isTransactional)
                 {
                     _session.commit();
@@ -127,6 +123,7 @@
                 _log.info("Received message count: " + _actualMessageCount);
             }
 
+            checkForMessageID(m);
             if (_actualMessageCount == _expectedMessageCount)
             {
                 _completed = true;
@@ -149,6 +146,30 @@
         }
     }
 
+    /**
+     * Checks if the received AMQ Message ID(delivery tag) is in sequence, by 
comparing it with the AMQ MessageID
+     * of previous message.
+     * @param receivedMsg
+     */
+    private void checkForMessageID(Message receivedMsg)
+    {
+        try
+        {
+            JMSTextMessage msg = (JMSTextMessage)receivedMsg;
+            if (! (msg.getDeliveryTag() == _messageIdentifier + 1))
+            {
+                _log.info("Out of sequence message received. Previous AMQ 
MessageID= " + _messageIdentifier +
+                          ", Received AMQ messageID= " + 
receivedMsg.getJMSMessageID());
+            }
+            _messageIdentifier = msg.getDeliveryTag();
+        }
+        catch (Exception ex)
+        {
+            _log.error("Error in checking messageID ", ex);
+        }
+
+    }
+
     private void notifyWaiter()
     {
         if (_waiter != null)
@@ -178,10 +199,8 @@
             _session = (Session) _connection.createSession(_isTransactional, 
Session.AUTO_ACKNOWLEDGE);
             _producerSession = (Session) 
_connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
 
-
             _connection.setExceptionListener(this);
 
-
             AMQQueue destination = new AMQQueue(commandQueueName);
             _producer = (MessageProducer) 
_producerSession.createProducer(destination);
             _producer.setDisableMessageTimestamp(true);
@@ -195,7 +214,7 @@
             //Send first message, then wait a bit to allow the provider to get 
initialised
             TextMessage first = _session.createTextMessage(MESSAGE_DATA);
             first.setJMSReplyTo(_tempDestination);
-            send(first);
+             _producer.send(first);
             if (_isTransactional)
             {
                 _producerSession.commit();
@@ -219,13 +238,6 @@
         }
     }
 
-    private void send(TextMessage msg) throws JMSException
-    {
-        msg.setIntProperty(MESSAGE_IDENTIFIER, ++_messageIdentifier);
-        _producer.send(msg);
-        _log.info("Sent Message Identifier: " + _messageIdentifier);
-    }
-
     /**
      * Run the test and notify an object upon receipt of all responses.
      *
@@ -245,7 +257,7 @@
                 long timeNow = System.currentTimeMillis();
                 msg.setLongProperty("timeSent", timeNow);
             }
-            send(msg);
+             _producer.send(msg);
             if (_isTransactional)
             {
                 _producerSession.commit();
@@ -263,8 +275,7 @@
     private void createConnection(String brokerHosts, String clientID, String 
username, String password,
                                   String vpath) throws AMQException, 
URLSyntaxException
     {
-        _connection = new AMQConnection(brokerHosts, username, password,
-                                        clientID, vpath);
+        _connection = new AMQConnection(brokerHosts, username, password, 
clientID, vpath);
     }
 
     /**

Modified: incubator/qpid/trunk/qpid/java/perftests/src/main/java/perftests.log4j
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/perftests/src/main/java/perftests.log4j?view=diff&rev=496260&r1=496259&r2=496260
==============================================================================
--- incubator/qpid/trunk/qpid/java/perftests/src/main/java/perftests.log4j 
(original)
+++ incubator/qpid/trunk/qpid/java/perftests/src/main/java/perftests.log4j Mon 
Jan 15 01:39:38 2007
@@ -22,7 +22,7 @@
 log4j.logger.org.apache.qpid=${amqj.logging.level}, console
 log4j.additivity.org.apache.qpid=false
 
-log4j.logger.org.apache.qpid.requestreply=${amqj.test.logging.level}, fileApp
+log4j.logger.org.apache.qpid.requestreply=${amqj.test.logging.level}
 log4j.logger.org.apache.qpid.pingpong=${amqj.test.logging.level}
 log4j.logger.org.apache.qpid.topic=${amqj.test.logging.level}
 


Reply via email to