Author: arnaudsimon
Date: Tue Jun  3 08:25:36 2008
New Revision: 662827

URL: http://svn.apache.org/viewvc?rev=662827&view=rev
Log:
QPID-1112: Update previous commit by re-using messageAcknowledge (added a flag 
specifying whether to send an messageAccept)

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=662827&r1=662826&r2=662827&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Tue Jun  3 08:25:36 2008
@@ -160,7 +160,7 @@
             ranges.add((int) deliveryTag);
             _unacknowledgedMessageTags.remove(deliveryTag);
         }
-        getQpidSession().messageAcknowledge(ranges);
+        getQpidSession().messageAcknowledge(ranges, _acknowledgeMode != 
org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
     }
 
     /**

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=662827&r1=662826&r2=662827&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Tue Jun  3 08:25:36 2008
@@ -24,10 +24,12 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.jms.*;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpidity.api.Message;
 import org.apache.qpidity.transport.*;
+import org.apache.qpidity.transport.Session;
 import org.apache.qpidity.QpidException;
 import org.apache.qpidity.filter.MessageFilter;
 import org.apache.qpidity.filter.JMSSelectorFilter;
@@ -77,12 +79,6 @@
      */
     private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
 
-    /**
-     * Used for no-ack mode so to send session completion command
-     */
-    private int _numberReceivedMessages = 0;
-    private int _firstMessageToComplete;
-
     //--- constructor
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection 
connection, AMQDestination destination,
                                         String messageSelector, boolean 
noLocal, MessageFactoryRegistry messageFactory,
@@ -165,25 +161,6 @@
      */
     public void onMessage(Message message)
     {
-        /**
-         * For no-ack mode
-         */
-        if( _acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE )
-        {
-            _numberReceivedMessages++;
-            if(_numberReceivedMessages == 1)
-            {
-                _firstMessageToComplete = message.getMessageTransferId();
-            }
-           if(_numberReceivedMessages >= 
getSession().getAMQConnection().getMaxPrefetch() )
-            {
-                RangeSet r = new RangeSet();
-                r.add(_firstMessageToComplete, message.getMessageTransferId());
-                _0_10session.getQpidSession().sessionCompleted(r, 
Option.TIMELY_REPLY);
-                _numberReceivedMessages = 0;
-            }
-        }
-
         int channelId = getSession().getChannelId();
         long deliveryId = message.getMessageTransferId();
         AMQShortString consumerTag = getConsumerTag();
@@ -383,7 +360,8 @@
         {
             RangeSet ranges = new RangeSet();
             ranges.add((int) message.getDeliveryTag());
-            _0_10session.getQpidSession().messageAcknowledge(ranges);
+            _0_10session.getQpidSession().messageAcknowledge(ranges,
+                    _acknowledgeMode != 
org.apache.qpid.jms.Session.NO_ACKNOWLEDGE );
             _0_10session.getCurrentException();
         }
     }
@@ -499,4 +477,13 @@
         }
         return o;
     }
+
+    void postDeliver(AbstractJMSMessage msg) throws JMSException
+    {
+        super.postDeliver(msg);
+        if(_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && 
!_session.isInRecovery())
+        {
+          _session.acknowledgeMessage(msg.getDeliveryTag(), false);            
    
+        }               
+    }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=662827&r1=662826&r2=662827&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
 Tue Jun  3 08:25:36 2008
@@ -65,16 +65,6 @@
 
     public void sessionDetach(byte[] name);
 
-    /**
-     * This control is sent by the receiver of commands, and handled by the 
sender
-     * of commands. It informs the sender of all commands completed by the 
receiver.
-     * This excludes commands known by the receiver to be considered complete 
at the sender.
-     *
-     * @param commands completed commands.
-     * @param options  [EMAIL PROTECTED] Option#TIMELY_REPLY} If set, the 
sender is no longer free to delay the known-completed reply.
-     */
-    public void sessionCompleted(RangeSet commands, Option... options);
-
     public void sessionRequestTimeout(long expiry);
 
     public byte[] getName();
@@ -328,8 +318,9 @@
      * pre-acquire mode or by explicitly acquiring them.
      *
      * @param ranges Range of messages to be acknowledged.
+     * @param accept pecify whether to send a message accept to the broker
      */
-    public void messageAcknowledge(RangeSet ranges);
+    public void messageAcknowledge(RangeSet ranges, boolean accept);
 
     /**
      * Reject a range of acquired messages.

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=662827&r1=662826&r2=662827&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
 Tue Jun  3 08:25:36 2008
@@ -59,14 +59,17 @@
         super(name);
     }
 
-    public void messageAcknowledge(RangeSet ranges)
+    public void messageAcknowledge(RangeSet ranges, boolean accept)
     {
         for (Range range : ranges)
         {
             super.processed(range);
         }
         super.flushProcessed();
-        messageAccept(ranges);
+        if( accept )
+        {
+            messageAccept(ranges);
+        }
     }
 
     public void messageSubscribe(String queue, String destination, short 
acceptMode, short acquireMode, MessagePartListener listener, Map<String, 
Object> filter, Option... options)
@@ -105,11 +108,6 @@
         _currentDataSizeNotSynced = 0;
     }
 
-    public void sessionCompleted(RangeSet commands, Option ... options)
-    {
-        super.sessionCompleted(commands, options);
-    }
-
     /* -------------------------
      * Data methods
      * ------------------------*/

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java?rev=662827&r1=662826&r2=662827&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
 Tue Jun  3 08:25:36 2008
@@ -108,7 +108,7 @@
                                         System.out.println("--------/Message 
Received--------");
                                         RangeSet ack = new RangeSet();
                                         
ack.add(message.getMessageTransferId(),message.getMessageTransferId());
-                                        session.messageAcknowledge(ack);
+                                        session.messageAcknowledge(ack, true);
                                     }
 
                                  }),


Reply via email to