Author: arnaudsimon
Date: Tue Jun 3 08:25:36 2008
New Revision: 662827
URL: http://svn.apache.org/viewvc?rev=662827&view=rev
Log:
QPID-1112: Update previous commit by re-using messageAcknowledge (added a flag
specifying whether to send an messageAccept)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=662827&r1=662826&r2=662827&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Tue Jun 3 08:25:36 2008
@@ -160,7 +160,7 @@
ranges.add((int) deliveryTag);
_unacknowledgedMessageTags.remove(deliveryTag);
}
- getQpidSession().messageAcknowledge(ranges);
+ getQpidSession().messageAcknowledge(ranges, _acknowledgeMode !=
org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
}
/**
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=662827&r1=662826&r2=662827&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Tue Jun 3 08:25:36 2008
@@ -24,10 +24,12 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.AMQException;
+import org.apache.qpid.jms.*;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.transport.*;
+import org.apache.qpidity.transport.Session;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.filter.MessageFilter;
import org.apache.qpidity.filter.JMSSelectorFilter;
@@ -77,12 +79,6 @@
*/
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
- /**
- * Used for no-ack mode so to send session completion command
- */
- private int _numberReceivedMessages = 0;
- private int _firstMessageToComplete;
-
//--- constructor
protected BasicMessageConsumer_0_10(int channelId, AMQConnection
connection, AMQDestination destination,
String messageSelector, boolean
noLocal, MessageFactoryRegistry messageFactory,
@@ -165,25 +161,6 @@
*/
public void onMessage(Message message)
{
- /**
- * For no-ack mode
- */
- if( _acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE )
- {
- _numberReceivedMessages++;
- if(_numberReceivedMessages == 1)
- {
- _firstMessageToComplete = message.getMessageTransferId();
- }
- if(_numberReceivedMessages >=
getSession().getAMQConnection().getMaxPrefetch() )
- {
- RangeSet r = new RangeSet();
- r.add(_firstMessageToComplete, message.getMessageTransferId());
- _0_10session.getQpidSession().sessionCompleted(r,
Option.TIMELY_REPLY);
- _numberReceivedMessages = 0;
- }
- }
-
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
AMQShortString consumerTag = getConsumerTag();
@@ -383,7 +360,8 @@
{
RangeSet ranges = new RangeSet();
ranges.add((int) message.getDeliveryTag());
- _0_10session.getQpidSession().messageAcknowledge(ranges);
+ _0_10session.getQpidSession().messageAcknowledge(ranges,
+ _acknowledgeMode !=
org.apache.qpid.jms.Session.NO_ACKNOWLEDGE );
_0_10session.getCurrentException();
}
}
@@ -499,4 +477,13 @@
}
return o;
}
+
+ void postDeliver(AbstractJMSMessage msg) throws JMSException
+ {
+ super.postDeliver(msg);
+ if(_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE &&
!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ }
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=662827&r1=662826&r2=662827&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
Tue Jun 3 08:25:36 2008
@@ -65,16 +65,6 @@
public void sessionDetach(byte[] name);
- /**
- * This control is sent by the receiver of commands, and handled by the
sender
- * of commands. It informs the sender of all commands completed by the
receiver.
- * This excludes commands known by the receiver to be considered complete
at the sender.
- *
- * @param commands completed commands.
- * @param options [EMAIL PROTECTED] Option#TIMELY_REPLY} If set, the
sender is no longer free to delay the known-completed reply.
- */
- public void sessionCompleted(RangeSet commands, Option... options);
-
public void sessionRequestTimeout(long expiry);
public byte[] getName();
@@ -328,8 +318,9 @@
* pre-acquire mode or by explicitly acquiring them.
*
* @param ranges Range of messages to be acknowledged.
+ * @param accept pecify whether to send a message accept to the broker
*/
- public void messageAcknowledge(RangeSet ranges);
+ public void messageAcknowledge(RangeSet ranges, boolean accept);
/**
* Reject a range of acquired messages.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=662827&r1=662826&r2=662827&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
Tue Jun 3 08:25:36 2008
@@ -59,14 +59,17 @@
super(name);
}
- public void messageAcknowledge(RangeSet ranges)
+ public void messageAcknowledge(RangeSet ranges, boolean accept)
{
for (Range range : ranges)
{
super.processed(range);
}
super.flushProcessed();
- messageAccept(ranges);
+ if( accept )
+ {
+ messageAccept(ranges);
+ }
}
public void messageSubscribe(String queue, String destination, short
acceptMode, short acquireMode, MessagePartListener listener, Map<String,
Object> filter, Option... options)
@@ -105,11 +108,6 @@
_currentDataSizeNotSynced = 0;
}
- public void sessionCompleted(RangeSet commands, Option ... options)
- {
- super.sessionCompleted(commands, options);
- }
-
/* -------------------------
* Data methods
* ------------------------*/
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java?rev=662827&r1=662826&r2=662827&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
Tue Jun 3 08:25:36 2008
@@ -108,7 +108,7 @@
System.out.println("--------/Message
Received--------");
RangeSet ack = new RangeSet();
ack.add(message.getMessageTransferId(),message.getMessageTransferId());
- session.messageAcknowledge(ack);
+ session.messageAcknowledge(ack, true);
}
}),