Author: arnaudsimon
Date: Wed Oct 3 06:04:04 2007
New Revision: 581589
URL: http://svn.apache.org/viewvc?rev=581589&view=rev
Log:
Changed to use message.recover instead of message.release
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=581589&r1=581588&r2=581589&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Wed Oct 3 06:04:04 2007
@@ -248,13 +248,13 @@
public void sendRecover() throws AMQException, FailoverException
{
// release all unack messages
- RangeSet ranges = new RangeSet();
+ /*RangeSet ranges = new RangeSet();
for (long messageTag : _unacknowledgedMessageTags)
{
// release this message
ranges.add(messageTag);
- }
- getQpidSession().messageRelease(ranges);
+ }*/
+ getQpidSession().messageRecover(Option.REQUEUE);
// We need to sync so that we get notify of an error.
getQpidSession().sync();
getCurrentException();
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=581589&r1=581588&r2=581589&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
Wed Oct 3 06:04:04 2007
@@ -79,7 +79,9 @@
// Producer
//------------------------------------------------------
/**
- * Transfer the given message to a specified exchange.
+ * Transfer the given
+ *
+ * to a specified exchange.
* <p/>
* <p>This is a convinience method for providing a complete message
* using a single method which internaly is mapped to messageTransfer(),
headers() followed
@@ -362,6 +364,23 @@
* @param text String describing the reason for a message transfer
rejection.
*/
public void messageReject(RangeSet ranges, int code, String text);
+
+ /**
+ * This method asks the broker to redeliver all unacknowledged messages on
a specified session.
+ * Zero or more messages may be redelivered. This method is only allowed
on non-transacted
+ * sessions.
+ * <p> Following are valid options:
+ * <ul>
+ * <li>[EMAIL PROTECTED] Option#REQUEUE}: <p>IIf this field is not set,
the message will be redelivered to the original recipient.
+ * If this option is ser, the server will attempt to requeue the message,
potentially then delivering it
+ * to an alternative subscriber.
+ * <p/>
+ * </ul>
+ *
+ * @param _options see available options
+ */
+ public void messageRecover(Option... _options);
+
/**
* As it is possible that the broker does not manage to reject some
messages, after completion of
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=581589&r1=581588&r2=581589&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
Wed Oct 3 06:04:04 2007
@@ -105,7 +105,7 @@
processed.add(range);
flush = syncPoint != null && processed.includes(syncPoint);
}
- if (flush)
+ if (! flush)
{
flushProcessed();
}