Author: rgreig
Date: Sat Dec 16 13:55:31 2006
New Revision: 487903
URL: http://svn.apache.org/viewvc?view=rev&rev=487903
Log:
QPID-207 : Patch supplied by Rob Godfrey - fix implementation of acknowledge as
per JMS spec
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/Message.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=487903&r1=487902&r2=487903
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Sat Dec 16 13:55:31 2006
@@ -39,6 +39,7 @@
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
+
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.io.Serializable;
@@ -49,6 +50,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class AMQSession extends Closeable implements Session, QueueSession,
TopicSession
{
@@ -136,6 +138,7 @@
*/
private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
+ private final AtomicLong _lastDeliveryTag = new AtomicLong();
/**
@@ -181,7 +184,9 @@
}
else
{
+
consumer.notifyMessage(message, _channelId);
+
}
}
else
@@ -696,6 +701,27 @@
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
false));
}
+
+
+ public void acknowledge() throws JMSException
+ {
+ if (getAMQConnection().isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Connection is already
closed");
+ }
+ if (isClosed())
+ {
+ throw new javax.jms.IllegalStateException("Session is already
closed");
+ }
+ acknowledgeMessage(_lastDeliveryTag.get(), true);
+
+ }
+
+ void setLastDeliveredMessage(AbstractJMSMessage message)
+ {
+ _lastDeliveryTag.set(message.getDeliveryTag());
+ }
+
public MessageListener getMessageListener() throws JMSException
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=487903&r1=487902&r2=487903
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Sat Dec 16 13:55:31 2006
@@ -214,10 +214,10 @@
{
//handle case where connection has already been started, and
the dispatcher is blocked
//doing a put on the _synchronousQueue
- Object msg = _synchronousQueue.poll();
- if (msg != null)
+ AbstractJMSMessage jmsMsg =
(AbstractJMSMessage)_synchronousQueue.poll();
+ if (jmsMsg != null)
{
- AbstractJMSMessage jmsMsg = (AbstractJMSMessage) msg;
+ _session.setLastDeliveredMessage(jmsMsg);
messageListener.onMessage(jmsMsg);
postDeliver(jmsMsg);
}
@@ -297,8 +297,10 @@
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
+ _session.setLastDeliveredMessage(m);
postDeliver(m);
}
+
return m;
}
catch (InterruptedException e)
@@ -324,8 +326,10 @@
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
+ _session.setLastDeliveredMessage(m);
postDeliver(m);
}
+
return m;
}
finally
@@ -424,6 +428,7 @@
{
//we do not need a lock around the test above, and the
dispatch below as it is invalid
//for an application to alter an installed listener while the
session is started
+ _session.setLastDeliveredMessage(jmsMessage);
getMessageListener().onMessage(jmsMessage);
postDeliver(jmsMessage);
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=487903&r1=487902&r2=487903
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Sat Dec 16 13:55:31 2006
@@ -43,7 +43,7 @@
import java.util.Enumeration;
import java.util.Map;
-public abstract class AbstractJMSMessage extends AMQMessage implements
javax.jms.Message
+public abstract class AbstractJMSMessage extends AMQMessage implements
org.apache.qpid.jms.Message
{
private static final Map _destinationCache =
Collections.synchronizedMap(new ReferenceMap());
@@ -384,7 +384,7 @@
getJmsContentHeaderProperties().getJMSHeaders().remove(propertyName);
}
- public void acknowledge() throws JMSException
+ public void acknowledgeThis() throws JMSException
{
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are
ignored when client acknowledge
// is not specified. In our case, we only set the session field where
client acknowledge mode is specified.
@@ -398,6 +398,14 @@
// we set multiple to true here since acknowledgement implies
acknowledge of all previous messages
// received on the session
_session.acknowledgeMessage(_deliveryTag, true);
+ }
+ }
+
+ public void acknowledge() throws JMSException
+ {
+ if(_session != null)
+ {
+ _session.acknowledge();
}
}
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/Message.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/Message.java?view=auto&rev=487903
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/Message.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/Message.java
Sat Dec 16 13:55:31 2006
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.*;
+
+public interface Message extends javax.jms.Message
+{
+ public void acknowledgeThis() throws JMSException;
+}
Modified:
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java?view=diff&rev=487903&r1=487902&r2=487903
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
Sat Dec 16 13:55:31 2006
@@ -36,18 +36,21 @@
{
private static final Logger _logger = Logger.getLogger(RecoverTest.class);
- static
+ protected void setUp() throws Exception
{
- String workdir = System.getProperty("QPID_WORK");
- if (workdir == null || workdir.equals(""))
- {
- String tempdir = System.getProperty("java.io.tmpdir");
- System.out.println("QPID_WORK not set using tmp directory: " +
tempdir);
- System.setProperty("QPID_WORK", tempdir);
- }
- DOMConfigurator.configure("../broker/etc/log4j.xml");
+ super.setUp();
+ TransportConnection.createVMBroker(1);
}
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ //Thread.sleep(2000);
+ }
+
+
+
public void testRecoverResendsMsgs() throws Exception
{
Connection con = new AMQConnection("vm://:1", "guest", "guest",
"consumer1", "/test");
@@ -104,8 +107,74 @@
con.close();
}
+
+ public void testRecoverResendsMsgsAckOnEarlier() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest",
"consumer1", "/test");
+
+ Session consumerSession = con.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("someQ", "someQ", false, true);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ //force synch to ensure the consumer has resulted in a bound queue
+ ((AMQSession) consumerSession).declareExchangeSynch("amq.direct",
"direct");
+
+ Connection con2 = new AMQConnection("vm://:1", "guest", "guest",
"producer1", "/test");
+ Session producerSession = con2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
+
+ con2.close();
+
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ TextMessage tm2 = (TextMessage) consumer.receive();
+ tm.acknowledge();
+ _logger.info("Received 2 messages, acknowledge() first message, should
acknowledge both");
+
+ consumer.receive();
+ consumer.receive();
+ _logger.info("Received all four messages. Calling recover with two
outstanding messages");
+ // no ack for last three messages so when I call recover I expect to
get three messages back
+ consumerSession.recover();
+ TextMessage tm3 = (TextMessage) consumer.receive(3000);
+ assertEquals("msg3", tm3.getText());
+
+ TextMessage tm4 = (TextMessage) consumer.receive(3000);
+ assertEquals("msg4", tm4.getText());
+
+
+ _logger.info("Received redelivery of two messages. calling
acknolwedgeThis() first of those message");
+ ((org.apache.qpid.jms.Message)tm3).acknowledgeThis();
+
+ _logger.info("Calling recover");
+ // all acked so no messages to be delivered
+ consumerSession.recover();
+
+ tm4 = (TextMessage) consumer.receive(3000);
+ assertEquals("msg4", tm4.getText());
+ ((org.apache.qpid.jms.Message)tm4).acknowledgeThis();
+
+ _logger.info("Calling recover");
+ // all acked so no messages to be delivered
+ consumerSession.recover();
+
+
+ tm = (TextMessage) consumer.receiveNoWait();
+ assertNull(tm);
+ _logger.info("No messages redelivered as is expected");
+
+ con.close();
+ }
+
+
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new
junit.framework.TestSuite(RecoverTest.class));
+ return new junit.framework.TestSuite(RecoverTest.class);
}
}