Author: ritchiem
Date: Thu Apr 26 09:08:39 2007
New Revision: 532788
URL: http://svn.apache.org/viewvc?view=rev&rev=532788
Log:
Merged revisions 532786 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r532786 | ritchiem | 2007-04-26 16:59:24 +0100 (Thu, 26 Apr 2007) | 3 lines
QPID-466 Create STRICT_AMQP System property to disable JMS extensions in Java
client.
This disables the JMS features that rely upon Qpid Java broker specific
features.
........
Modified:
incubator/qpid/trunk/qpid/ (props changed)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=532788&r1=532787&r2=532788
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Thu Apr 26 09:08:39 2007
@@ -202,11 +202,20 @@
/** Boolean to control immediate prefetch . Records the first call to the
dispatcher to prevent further flow(true) */
private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+ /** System property to enable strickt AMQP compliance */
+ public static final String STRICT_AMQP = "STRICT_AMQP";
+ /** Strickt AMQP default */
+ public static final String STRICT_AMQP_DEFAULT = "false";
+
+ private final boolean _strictAMQP;
+
+
/** System property to enable immediate message prefetching */
public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
/** Immediate message prefetch default */
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+ private final boolean _immediatePrefetch;
private static final Logger _dispatcherLogger =
Logger.getLogger(Dispatcher.class);
@@ -435,6 +444,10 @@
AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int
defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
+
+ _strictAMQP =
Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP,
STRICT_AMQP_DEFAULT));
+ _immediatePrefetch =
Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH,
IMMEDIATE_PREFETCH_DEFAULT));
+
_connection = con;
_transacted = transacted;
if (transacted)
@@ -921,15 +934,27 @@
_dispatcher.rollback();
}
- // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions
change.
-
_connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
-
getProtocolMajorVersion(),
-
getProtocolMinorVersion(),
-
false) // requeue
- , BasicRecoverOkBody.class);
+ if (isStrictAMQP())
+ {
+ // We can't use the BasicRecoverBody-OK method as it isn't
part of the spec.
+
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+
getProtocolMajorVersion(),
+
getProtocolMinorVersion(),
+
false)); // requeue
+ _logger.warn("Session Recover cannot be guaranteed with
STRICT_AMQP. Messages may arrive out of order.");
+ }
+ else
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8,
minor=0)
+ // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions
change.
+
_connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
+
getProtocolMajorVersion(),
+
getProtocolMinorVersion(),
+
false) // requeue
+ , BasicRecoverOkBody.class);
+ }
if (!isSuspended)
{
suspendChannel(false);
@@ -1433,7 +1458,6 @@
private void consumeFromQueue(BasicMessageConsumer consumer,
AMQShortString queueName, AMQProtocolHandler protocolHandler,
boolean nowait, String messageSelector)
throws AMQException
{
- //fixme prefetch values are not used here. Do we need to have them as
parametsrs?
//need to generate a consumer tag on the client so we can exploit the
nowait flag
AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
@@ -1709,11 +1733,21 @@
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
+ if (isStrictAMQP())
+ {
+ throw new UnsupportedOperationException();
+ }
+
return createBrowser(queue, null);
}
public QueueBrowser createBrowser(Queue queue, String messageSelector)
throws JMSException
{
+ if (isStrictAMQP())
+ {
+ throw new UnsupportedOperationException();
+ }
+
checkNotClosed();
checkValidQueue(queue);
return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
@@ -1762,6 +1796,11 @@
boolean isQueueBound(AMQShortString exchangeName, AMQShortString
queueName, AMQShortString routingKey) throws JMSException
{
+ if (isStrictAMQP())
+ {
+ throw new UnsupportedOperationException();
+ }
+
// TODO: Be aware of possible changes to parameter order as versions
change.
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version
(major, minor)
@@ -1940,7 +1979,7 @@
synchronized void startDistpatcherIfNecessary()
{
// If IMMEDIATE_PREFETCH is not set then we need to start fetching
- if
(!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH,
IMMEDIATE_PREFETCH_DEFAULT)))
+ if (!_immediatePrefetch)
{
// We do this now if this is the first call on a started connection
if (isSuspended() && _startedAtLeastOnce.get() &&
_firstDispatcher.getAndSet(false))
@@ -2005,7 +2044,7 @@
bindQueue(amqd, queueName, protocolHandler,
consumer.getRawSelectorFieldTable());
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to
delay prefetch
- if
(!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH,
IMMEDIATE_PREFETCH_DEFAULT)))
+ if (!_immediatePrefetch)
{
// The dispatcher will be null if we have just created this session
// so suspend the channel before we register our consumer so that
we don't
@@ -2390,6 +2429,11 @@
_connection.getProtocolHandler().writeFrame(basicRejectBody);
}
+ }
+
+ public boolean isStrictAMQP()
+ {
+ return _strictAMQP;
}
}