Author: rgreig
Date: Sat Dec 16 13:55:31 2006
New Revision: 487903

URL: http://svn.apache.org/viewvc?view=rev&rev=487903
Log:
QPID-207 : Patch supplied by Rob Godfrey - fix implementation of acknowledge as 
per JMS spec

Added:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/Message.java
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=487903&r1=487902&r2=487903
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Sat Dec 16 13:55:31 2006
@@ -39,6 +39,7 @@
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
 
+
 import javax.jms.*;
 import javax.jms.IllegalStateException;
 import java.io.Serializable;
@@ -49,6 +50,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class AMQSession extends Closeable implements Session, QueueSession, 
TopicSession
 {
@@ -136,6 +138,7 @@
      */
     private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
 
+    private final AtomicLong _lastDeliveryTag = new AtomicLong();
 
 
     /**
@@ -181,7 +184,9 @@
                 }
                 else
                 {
+
                     consumer.notifyMessage(message, _channelId);
+
                 }
             }
             else
@@ -696,6 +701,27 @@
 
         
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
 false));
     }
+
+
+    public void acknowledge() throws JMSException
+    {
+        if (getAMQConnection().isClosed())
+        {
+            throw new javax.jms.IllegalStateException("Connection is already 
closed");
+        }
+        if (isClosed())
+        {
+            throw new javax.jms.IllegalStateException("Session is already 
closed");            
+        }
+        acknowledgeMessage(_lastDeliveryTag.get(), true);
+
+    }
+
+    void setLastDeliveredMessage(AbstractJMSMessage message)
+    {
+        _lastDeliveryTag.set(message.getDeliveryTag());    
+    }
+
 
     public MessageListener getMessageListener() throws JMSException
     {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=487903&r1=487902&r2=487903
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Sat Dec 16 13:55:31 2006
@@ -214,10 +214,10 @@
             {
                 //handle case where connection has already been started, and 
the dispatcher is blocked
                 //doing a put on the _synchronousQueue
-                Object msg = _synchronousQueue.poll();
-                if (msg != null)
+                AbstractJMSMessage jmsMsg = 
(AbstractJMSMessage)_synchronousQueue.poll();
+                if (jmsMsg != null)
                 {
-                    AbstractJMSMessage jmsMsg = (AbstractJMSMessage) msg;
+                    _session.setLastDeliveredMessage(jmsMsg);
                     messageListener.onMessage(jmsMsg);
                     postDeliver(jmsMsg);
                 }
@@ -297,8 +297,10 @@
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
             {
+                _session.setLastDeliveredMessage(m);
                 postDeliver(m);
             }
+            
             return m;
         }
         catch (InterruptedException e)
@@ -324,8 +326,10 @@
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
             {
+                _session.setLastDeliveredMessage(m);
                 postDeliver(m);
             }
+
             return m;
         }
         finally
@@ -424,6 +428,7 @@
             {
                 //we do not need a lock around the test above, and the 
dispatch below as it is invalid
                 //for an application to alter an installed listener while the 
session is started
+                _session.setLastDeliveredMessage(jmsMessage);
                 getMessageListener().onMessage(jmsMessage);
                 postDeliver(jmsMessage);
             }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=487903&r1=487902&r2=487903
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
 Sat Dec 16 13:55:31 2006
@@ -43,7 +43,7 @@
 import java.util.Enumeration;
 import java.util.Map;
 
-public abstract class AbstractJMSMessage extends AMQMessage implements 
javax.jms.Message
+public abstract class AbstractJMSMessage extends AMQMessage implements 
org.apache.qpid.jms.Message
 {
     private static final Map _destinationCache = 
Collections.synchronizedMap(new ReferenceMap());
 
@@ -384,7 +384,7 @@
         getJmsContentHeaderProperties().getJMSHeaders().remove(propertyName);
     }
 
-    public void acknowledge() throws JMSException
+    public void acknowledgeThis() throws JMSException
     {
         // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are 
ignored when client acknowledge
         // is not specified. In our case, we only set the session field where 
client acknowledge mode is specified.
@@ -398,6 +398,14 @@
             // we set multiple to true here since acknowledgement implies 
acknowledge of all previous messages
             // received on the session
             _session.acknowledgeMessage(_deliveryTag, true);
+        }
+    }
+
+    public void acknowledge() throws JMSException
+    {
+        if(_session != null)
+        {
+            _session.acknowledge();
         }
     }
 

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/Message.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/Message.java?view=auto&rev=487903
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/Message.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/Message.java
 Sat Dec 16 13:55:31 2006
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.*;
+
+public interface Message extends javax.jms.Message
+{
+    public void acknowledgeThis() throws JMSException;
+}

Modified: 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java?view=diff&rev=487903&r1=487902&r2=487903
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
 Sat Dec 16 13:55:31 2006
@@ -36,18 +36,21 @@
 {
     private static final Logger _logger = Logger.getLogger(RecoverTest.class);
 
-    static
+    protected void setUp() throws Exception
     {
-        String workdir = System.getProperty("QPID_WORK");
-        if (workdir == null || workdir.equals(""))
-        {
-            String tempdir = System.getProperty("java.io.tmpdir");
-            System.out.println("QPID_WORK not set using tmp directory: " + 
tempdir);
-            System.setProperty("QPID_WORK", tempdir);
-        }
-        DOMConfigurator.configure("../broker/etc/log4j.xml");
+        super.setUp();
+        TransportConnection.createVMBroker(1);
     }
 
+    protected void tearDown() throws Exception
+    {
+        super.tearDown();
+        TransportConnection.killAllVMBrokers();
+        //Thread.sleep(2000);
+    }
+
+
+
     public void testRecoverResendsMsgs() throws Exception
     {
         Connection con = new AMQConnection("vm://:1", "guest", "guest", 
"consumer1", "/test");
@@ -104,8 +107,74 @@
         con.close();
     }
 
+
+    public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+    {
+        Connection con = new AMQConnection("vm://:1", "guest", "guest", 
"consumer1", "/test");
+
+        Session consumerSession = con.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = new AMQQueue("someQ", "someQ", false, true);
+        MessageConsumer consumer = consumerSession.createConsumer(queue);
+        //force synch to ensure the consumer has resulted in a bound queue
+        ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", 
"direct");
+
+        Connection con2 = new AMQConnection("vm://:1", "guest", "guest", 
"producer1", "/test");
+        Session producerSession = con2.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(queue);
+
+        _logger.info("Sending four messages");
+        producer.send(producerSession.createTextMessage("msg1"));
+        producer.send(producerSession.createTextMessage("msg2"));
+        producer.send(producerSession.createTextMessage("msg3"));
+        producer.send(producerSession.createTextMessage("msg4"));
+
+        con2.close();
+
+        _logger.info("Starting connection");
+        con.start();
+        TextMessage tm = (TextMessage) consumer.receive();
+        TextMessage tm2 = (TextMessage) consumer.receive();
+        tm.acknowledge();
+        _logger.info("Received 2 messages, acknowledge() first message, should 
acknowledge both");
+
+        consumer.receive();
+        consumer.receive();
+        _logger.info("Received all four messages. Calling recover with two 
outstanding messages");
+        // no ack for last three messages so when I call recover I expect to 
get three messages back
+        consumerSession.recover();
+        TextMessage tm3 = (TextMessage) consumer.receive(3000);
+        assertEquals("msg3", tm3.getText());
+
+        TextMessage tm4 = (TextMessage) consumer.receive(3000);
+        assertEquals("msg4", tm4.getText());
+
+
+        _logger.info("Received redelivery of two messages. calling 
acknolwedgeThis() first of those message");
+        ((org.apache.qpid.jms.Message)tm3).acknowledgeThis();
+
+        _logger.info("Calling recover");
+        // all acked so no messages to be delivered
+        consumerSession.recover();
+
+        tm4 = (TextMessage) consumer.receive(3000);
+        assertEquals("msg4", tm4.getText());
+        ((org.apache.qpid.jms.Message)tm4).acknowledgeThis();
+
+        _logger.info("Calling recover");
+        // all acked so no messages to be delivered
+        consumerSession.recover();
+
+
+        tm = (TextMessage) consumer.receiveNoWait();
+        assertNull(tm);
+        _logger.info("No messages redelivered as is expected");
+
+        con.close();
+    }
+
+
     public static junit.framework.Test suite()
     {
-        return new VMBrokerSetup(new 
junit.framework.TestSuite(RecoverTest.class));
+        return new junit.framework.TestSuite(RecoverTest.class);
     }
 }


Reply via email to