Author: arnaudsimon
Date: Fri Aug 24 01:45:43 2007
New Revision: 569298

URL: http://svn.apache.org/viewvc?rev=569298&view=rev
Log: (empty)

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=569298&r1=569297&r2=569298&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 Fri Aug 24 01:45:43 2007
@@ -96,7 +96,7 @@
      * Nether exceed MAX_MESSAGE_TRANSFERRED
      */
     private int _messageAsyncrhonouslyReceived = 0;
-    
+
     private AtomicBoolean _messageReceived = new AtomicBoolean();
 
     //----- Constructors
@@ -109,12 +109,13 @@
      * @param noLocal          If true inhibits the delivery of messages 
published by its own connection.
      * @param subscriptionName Name of the subscription if this is to be 
created as a durable subscriber.
      *                         If this value is null, a non-durable 
subscription is created.
+     * @param consumerTag      This consumer ID. 
      * @throws Exception If the MessageProducerImpl cannot be created due to 
some internal error.
      */
     protected MessageConsumerImpl(SessionImpl session, DestinationImpl 
destination, String messageSelector,
-                                  boolean noLocal, String 
subscriptionName,String consumerTag) throws Exception
+                                  boolean noLocal, String subscriptionName, 
String consumerTag) throws Exception
     {
-        super(session, destination,consumerTag);
+        super(session, destination, consumerTag);
         if (messageSelector != null)
         {
             _messageSelector = messageSelector;
@@ -183,13 +184,13 @@
         // set the flow mode
         getSession().getQpidSession()
                 .messageFlowMode(getMessageActorID(), 
org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT);
-        
+
         // this will prevent the broker from sending more than one message
         // When a messageListener is set the flow will be adjusted.
         // until then we assume it's for synchronous message consumption
         getSession().getQpidSession()
-        .messageFlow(getMessageActorID(), 
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,1);
-        
+                .messageFlow(getMessageActorID(), 
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+
         getSession().getQpidSession().sync();
         // check for an exception
         if (getSession().getCurrentException() != null)
@@ -355,12 +356,12 @@
     {
         checkNotClosed();
         Message result = null;
-        
+
         if (_messageListener != null)
         {
             throw new javax.jms.IllegalStateException("A listener has already 
been set.");
         }
-        
+
         if (_incomingMessage != null)
         {
             System.out.println("We already had a message in the queue");
@@ -368,7 +369,7 @@
             _incomingMessage = null;
             return result;
         }
-       
+
         synchronized (_incomingMessageLock)
         {
             // This indicate to the delivery thread to deliver the message to 
this consumer
@@ -383,13 +384,13 @@
                 
getSession().getQpidSession().messageFlush(getMessageActorID());
                 _messageReceived.set(false);
                 System.out.println("no message in the queue, issuing a flow(1) 
and waiting for message");
-                                
+
                 //When sync() returns we know whether we have received a 
message or not.
                 getSession().getQpidSession().sync();
-                
-                System.out.println("we got returned from sync()"); 
+
+                System.out.println("we got returned from sync()");
                 //received = getSession().getQpidSession().messagesReceived();
-            }            
+            }
             if (_messageReceived.get() && timeout < 0)
             {
                 // this is a nowait and we havent received a message then we 
must immediatly return
@@ -406,7 +407,7 @@
                 {
                     try
                     {
-                        System.out.println("waiting for message");             
           
+                        System.out.println("waiting for message");
                         _incomingMessageLock.wait(timeout);
                     }
                     catch (InterruptedException e)
@@ -427,7 +428,7 @@
             // We now release any message received for this consumer
             _isReceiving = false;
             _isNoWaitIsReceiving = false;
-            getSession().testQpidException();            
+            getSession().testQpidException();
         }
         return result;
     }
@@ -500,7 +501,7 @@
             if (_messageListener == null)
             {
                 System.out.println("Received a message- onMessage in message 
consumer Impl");
-                
+
                 synchronized (_incomingMessageLock)
                 {
                     if (messageOk)
@@ -510,7 +511,7 @@
                         if (_isReceiving)
                         {
                             System.out.println("Received a message- onMessage 
in message _isReceiving");
-                            
+
                             _incomingMessage = message;
                             _incomingMessageLock.notify();
                         }
@@ -534,11 +535,11 @@
                                                  
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
                             
getSession().getQpidSession().messageFlush(getMessageActorID());
                             _messageReceived.set(false);
-                            
+
                             // When sync() returns we know whether we have 
received a message or not.
-                            getSession().getQpidSession().sync();              
         
-                            
-                            if (_messageReceived.get()  && 
_isNoWaitIsReceiving)
+                            getSession().getQpidSession().sync();
+
+                            if (_messageReceived.get() && _isNoWaitIsReceiving)
                             {
                                 // Right a message nowait is waiting for a 
message
                                 // but no one can be delivered it then need to 
return
@@ -619,7 +620,7 @@
             RangeSet ranges = new RangeSet();
             ranges.add(message.getMessageTransferId());
             getSession().getQpidSession().messageRelease(ranges);
-            getSession().testQpidException();            
+            getSession().testQpidException();
         }
     }
 
@@ -667,7 +668,7 @@
             getSession().testQpidException();
         }
     }
-    
+
     public void notifyMessageReceived()
     {
         _messageReceived.set(true);

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java?rev=569298&r1=569297&r2=569298&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/BytesMessageImpl.java
 Fri Aug 24 01:45:43 2007
@@ -70,17 +70,6 @@
     protected BytesMessageImpl(org.apache.qpidity.api.Message message) throws 
QpidException
     {
         super(message);
-        try
-        {
-            ByteBuffer b = message.readData();
-            byte[] a = new byte[b.limit()];
-            b.get(a);
-            _dataIn = new DataInputStream(new ByteArrayInputStream(a));
-        }
-        catch(Exception e)
-        {
-            e.printStackTrace();
-        }
     }
 
     //--- BytesMessage API

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java?rev=569298&r1=569297&r2=569298&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MapMessageImpl.java
 Fri Aug 24 01:45:43 2007
@@ -562,8 +562,8 @@
     //-- Overwritten methods
     /**
      * This method is invoked before this message is dispatched.
-     * <p>This class uses it to convert its text payload into a ByteBuffer
      */
+    @Override
     public void beforeMessageDispatch() throws QpidException
     {
         try

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java?rev=569298&r1=569297&r2=569298&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/MessageImpl.java
 Fri Aug 24 01:45:43 2007
@@ -83,7 +83,7 @@
     {
         super(message);
     }
-
+    
     //---- javax.jms.Message interface
     /**
      * Get the message ID.

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java?rev=569298&r1=569297&r2=569298&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
 Fri Aug 24 01:45:43 2007
@@ -31,11 +31,18 @@
 import org.apache.qpidity.QpidException;
 import org.apache.qpidity.ReplyTo;
 import org.apache.qpidity.client.util.ByteBufferMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class QpidMessage
 {
     /**
+     * this QpidMessage's logger
+     */
+    private static final Logger _logger = 
LoggerFactory.getLogger(QpidMessage.class);
+
+    /**
      * The underlying qpidity message
      */
     private org.apache.qpidity.api.Message _qpidityMessage;
@@ -326,8 +333,8 @@
      * @param messageBody The buffer containing this message data
      */
     protected void setMessageData(ByteBuffer messageBody)
-    {        
-        _messageData = messageBody.duplicate();
+    {
+        _messageData = messageBody; // we shouldn't need that .duplicate();
     }
 
     /**
@@ -388,13 +395,12 @@
         try
         {
             // set the message data
-            _qpidityMessage.clearData();
-            // we need to do a flip
-            //_messageData.flip();
-            
-            System.out.println("_messageData POS " + _messageData.position());
-            System.out.println("_messageData limit " + _messageData.limit());
-            
+            _qpidityMessage.clearData();         
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("_messageData POS " + _messageData.position());
+                _logger.debug("_messageData limit " + _messageData.limit());
+            }
             _qpidityMessage.appendData(_messageData);
             
_qpidityMessage.getMessageProperties().setApplicationHeaders(_messageProperties);
         }


Reply via email to