Author: ritchiem
Date: Tue Dec 19 08:07:12 2006
New Revision: 488712
URL: http://svn.apache.org/viewvc?view=rev&rev=488712
Log:
QPID-216
BasicConsumeMethodHandler.java - Pulled the nolocal param from the method body
and passed down channel to subscription.
SubscriptionFactory.java / AMQQueue.java/AMQChannel.java - passed the nolocal
parameter through to the Subscription
ConnectionStartOkMethodHandler.java - Saved the client properties so the client
identifier can be used in comparison with the publisher id to implement no_local
AMQMinaProtocolSession.java - added _clientProperties to store the sent client
properties.
AMQProtocolSession.java - interface changes to get/set ClientProperties
ConcurrentSelectorDeliveryManager.java - only need to do hasInterset as this
will take care of the hasFilters optimisation check.
SubscriptionImpl.java - Added code to do comparison of client ids to determin
insterest in a given message.
SubscriptionSet.java - tidied up code to use hasInterest as this is where the
nolocal is implemented.
ConnectionStartMethodHandler.java - Moved literal values to a
ClientProperties.java enumeration and a QpidProperties.java values.
QpidConnectionMetaData.java - updated to get values from QpidProperties.java
MockProtocolSession.java - null implementation of new get/set methods
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
(with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
(with props)
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Tue Dec 19 08:07:12 2006
@@ -286,12 +286,14 @@
* @param tag the tag chosen by the client (if null, server will
generate one)
* @param queue the queue to subscribe to
* @param session the protocol session of the subscriber
+ * @param noLocal
* @return the consumer tag. This is returned to the subscriber and used in
* subsequent unsubscribe requests
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
- public String subscribeToQueue(String tag, AMQQueue queue,
AMQProtocolSession session, boolean acks, FieldTable filters) throws
AMQException, ConsumerTagNotUniqueException
+ public String subscribeToQueue(String tag, AMQQueue queue,
AMQProtocolSession session, boolean acks,
+ FieldTable filters, boolean noLocal) throws
AMQException, ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -302,7 +304,7 @@
throw new ConsumerTagNotUniqueException();
}
- queue.registerProtocolSession(session, _channelId, tag, acks, filters);
+ queue.registerProtocolSession(session, _channelId, tag, acks,
filters,noLocal);
_consumerTag2QueueMap.put(tag, queue);
return tag;
}
@@ -499,7 +501,7 @@
if (_log.isDebugEnabled())
{
_log.debug("Handling acknowledgement for channel " + _channelId +
" with delivery tag " + deliveryTag +
- " and multiple " + multiple);
+ " and multiple " + multiple);
}
if (multiple)
{
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
Tue Dec 19 08:07:12 2006
@@ -77,7 +77,8 @@
}
try
{
- String consumerTag =
channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
body.arguments);
+ String consumerTag =
channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
+ body.arguments,
body.noLocal);
if (!body.nowait)
{
session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
@@ -90,8 +91,8 @@
{
_log.info("Closing connection due to invalid selector");
session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
AMQConstant.INVALID_SELECTOR.getCode(),
-
ise.getMessage(), BasicConsumeBody.CLASS_ID,
-
BasicConsumeBody.METHOD_ID));
+
ise.getMessage(), BasicConsumeBody.CLASS_ID,
+
BasicConsumeBody.METHOD_ID));
}
catch (ConsumerTagNotUniqueException e)
{
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
Tue Dec 19 08:07:12 2006
@@ -78,12 +78,19 @@
AuthenticationResult authResult = authMgr.authenticate(ss,
body.response);
+ //save clientProperties
+ if (protocolSession.getClientProperties() == null)
+ {
+ protocolSession.setClientProperties(body.clientProperties);
+ }
+
switch (authResult.status)
{
case ERROR:
throw new AMQException("Authentication failed");
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
+
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
Integer.MAX_VALUE, getConfiguredFrameSize(),
HeartbeatConfig.getInstance().getDelay());
@@ -122,7 +129,7 @@
static int getConfiguredFrameSize()
{
final Configuration config =
ApplicationRegistry.getInstance().getConfiguration();
- final int framesize = config.getInt("advanced.framesize",
DEFAULT_FRAME_SIZE);
+ final int framesize = config.getInt("advanced.framesize",
DEFAULT_FRAME_SIZE);
_logger.info("Framesize set to " + framesize);
return framesize;
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Tue Dec 19 08:07:12 2006
@@ -26,17 +26,19 @@
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersionList;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.codec.AMQDecoder;
+
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -89,10 +91,11 @@
private boolean _closed;
// maximum number of channels this session should have
private long _maxNoOfChannels = 1000;
-
+
/* AMQP Version for this session */
private byte _major;
private byte _minor;
+ private FieldTable _clientProperties;
public ManagedObject getManagedObject()
{
@@ -128,7 +131,7 @@
{
return new AMQProtocolSessionMBean(this);
}
- catch(JMException ex)
+ catch (JMException ex)
{
_logger.error("AMQProtocolSession MBean creation has failed ", ex);
throw new AMQException("AMQProtocolSession MBean creation has
failed ", ex);
@@ -153,18 +156,21 @@
{
ProtocolInitiation pi = (ProtocolInitiation) message;
// this ensures the codec never checks for a PI message again
-
((AMQDecoder)_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
- try {
+ ((AMQDecoder)
_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+ try
+ {
pi.checkVersion(this); // Fails if not correct
// This sets the protocol version (and hence framing classes)
for this session.
_major = pi.protocolMajor;
_minor = pi.protocolMinor;
String mechanisms =
ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
String locales = "en_US";
- AMQFrame response =
ConnectionStartBody.createAMQFrame((short)0, pi.protocolMajor,
pi.protocolMinor, null,
+ AMQFrame response = ConnectionStartBody.createAMQFrame((short)
0, pi.protocolMajor, pi.protocolMinor, null,
mechanisms.getBytes(), locales.getBytes());
_minaProtocolSession.write(response);
- } catch (AMQException e) {
+ }
+ catch (AMQException e)
+ {
_logger.error("Received incorrect protocol initiation", e);
/* Find last protocol version in protocol version list. Make
sure last protocol version
listed in the build file (build-module.xml) is the latest
version which will be used
@@ -211,7 +217,7 @@
_logger.debug("Method frame received: " + frame);
}
final AMQMethodEvent<AMQMethodBody> evt = new
AMQMethodEvent<AMQMethodBody>(frame.channel,
-
(AMQMethodBody)frame.bodyFrame);
+
(AMQMethodBody) frame.bodyFrame);
try
{
boolean wasAnyoneInterested = false;
@@ -266,7 +272,7 @@
{
_logger.debug("Content header frame received: " + frame);
}
-
getChannel(frame.channel).publishContentHeader((ContentHeaderBody)frame.bodyFrame);
+ getChannel(frame.channel).publishContentHeader((ContentHeaderBody)
frame.bodyFrame);
}
private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -275,7 +281,7 @@
{
_logger.debug("Content body frame received: " + frame);
}
-
getChannel(frame.channel).publishContentBody((ContentBody)frame.bodyFrame);
+ getChannel(frame.channel).publishContentBody((ContentBody)
frame.bodyFrame);
}
/**
@@ -355,6 +361,7 @@
* Close a specific channel. This will remove any resources used by the
channel, including:
* <ul><li>any queue subscriptions (this may in turn remove queues if they
are auto delete</li>
* </ul>
+ *
* @param channelId id of the channel to close
* @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
@@ -381,6 +388,7 @@
/**
* In our current implementation this is used by the clustering code.
+ *
* @param channelId
*/
public void removeChannel(int channelId)
@@ -390,11 +398,12 @@
/**
* Initialise heartbeats on the session.
+ *
* @param delay delay in seconds (not ms)
*/
public void initHeartbeats(int delay)
{
- if(delay > 0)
+ if (delay > 0)
{
_minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
_minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE,
HeartbeatConfig.getInstance().getTimeout(delay));
@@ -404,6 +413,7 @@
/**
* Closes all channels that were opened by this protocol session. This
frees up all resources
* used by the channel.
+ *
* @throws AMQException if an error occurs while closing any channel
*/
private void closeAllChannels() throws AMQException
@@ -421,7 +431,7 @@
*/
public void closeSession() throws AMQException
{
- if(!_closed)
+ if (!_closed)
{
_closed = true;
closeAllChannels();
@@ -463,11 +473,11 @@
// information is used by SASL primary.
if (address instanceof InetSocketAddress)
{
- return ((InetSocketAddress)address).getHostName();
+ return ((InetSocketAddress) address).getHostName();
}
else if (address instanceof VmPipeAddress)
{
- return "vmpipe:" + ((VmPipeAddress)address).getPort();
+ return "vmpipe:" + ((VmPipeAddress) address).getPort();
}
else
{
@@ -484,22 +494,32 @@
{
_saslServer = saslServer;
}
-
+
+ public FieldTable getClientProperties()
+ {
+ return _clientProperties;
+ }
+
+ public void setClientProperties(FieldTable clientProperties)
+ {
+ _clientProperties = clientProperties;
+ }
+
/**
* Convenience methods for managing AMQP version.
* NOTE: Both major and minor will be set to 0 prior to protocol
initiation.
*/
-
+
public byte getAmqpMajor()
{
return _major;
}
-
+
public byte getAmqpMinor()
{
return _minor;
}
-
+
public boolean amqpVersionEquals(byte major, byte minor)
{
return _major == major && _minor == minor;
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
Tue Dec 19 08:07:12 2006
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
@@ -122,4 +123,9 @@
* @param saslServer
*/
void setSaslServer(SaslServer saslServer);
+
+
+ FieldTable getClientProperties();
+
+ void setClientProperties(FieldTable clientProperties);
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Tue Dec 19 08:07:12 2006
@@ -96,7 +96,7 @@
* max allowed number of messages on a queue.
*/
private Integer _maxMessageCount = 10000;
-
+
/**
* max queue depth(KB) for the queue
*/
@@ -362,12 +362,17 @@
_bindings.addBinding(routingKey, exchange);
}
- public void registerProtocolSession(AMQProtocolSession ps, int channel,
String consumerTag, boolean acks, FieldTable filters)
+ public void registerProtocolSession(AMQProtocolSession ps, int channel,
String consumerTag, boolean acks, FieldTable filters) throws AMQException
+ {
+ registerProtocolSession(ps, channel, consumerTag, acks, filters,
false);
+ }
+
+ public void registerProtocolSession(AMQProtocolSession ps, int channel,
String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
throws AMQException
{
debug("Registering protocol session {0} with channel {1} and consumer
tag {2} with {3}", ps, channel, consumerTag, this);
- Subscription subscription =
_subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
filters);
+ Subscription subscription =
_subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
filters, noLocal);
_subscribers.addSubscriber(subscription);
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Tue Dec 19 08:07:12 2006
@@ -281,7 +281,7 @@
}
// Only give the message to those that want them.
- if (sub.hasFilters() && sub.hasInterest(msg))
+ if (sub.hasInterest(msg))
{
sub.enqueueForPreDelivery(msg);
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
Tue Dec 19 08:07:12 2006
@@ -33,12 +33,10 @@
*/
public interface SubscriptionFactory
{
- Subscription createSubscription(int channel, AMQProtocolSession
protocolSession, String consumerTag, boolean acks, FieldTable filters)
- throws AMQException;
+ Subscription createSubscription(int channel, AMQProtocolSession
protocolSession, String consumerTag, boolean acks,
+ FieldTable filters, boolean noLocal)
throws AMQException;
- Subscription createSubscription(int channel, AMQProtocolSession
protocolSession, String consumerTag, boolean acks)
- throws AMQException;
- Subscription createSubscription(int channel, AMQProtocolSession
protocolSession,String consumerTag)
- throws AMQException;
+ Subscription createSubscription(int channel, AMQProtocolSession
protocolSession, String consumerTag)
+ throws AMQException;
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Tue Dec 19 08:07:12 2006
@@ -23,6 +23,7 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
@@ -56,6 +57,7 @@
private Queue<AMQMessage> _messages;
+ private final boolean _noLocal;
/**
* True if messages need to be acknowledged
@@ -65,21 +67,15 @@
public static class Factory implements SubscriptionFactory
{
- public Subscription createSubscription(int channel, AMQProtocolSession
protocolSession, String consumerTag, boolean acks, FieldTable filters) throws
AMQException
+ public Subscription createSubscription(int channel, AMQProtocolSession
protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean
noLocal) throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag,
acks, filters);
- }
-
- public SubscriptionImpl createSubscription(int channel,
AMQProtocolSession protocolSession, String consumerTag, boolean acks)
- throws AMQException
- {
- return new SubscriptionImpl(channel, protocolSession, consumerTag,
acks, null);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag,
acks, filters, noLocal);
}
public SubscriptionImpl createSubscription(int channel,
AMQProtocolSession protocolSession, String consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag,
false, null);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag,
false, null, false);
}
}
@@ -87,11 +83,11 @@
String consumerTag, boolean acks)
throws AMQException
{
- this(channelId, protocolSession, consumerTag, acks, null);
+ this(channelId, protocolSession, consumerTag, acks, null, false);
}
public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
- String consumerTag, boolean acks, FieldTable
filters)
+ String consumerTag, boolean acks, FieldTable
filters, boolean noLocal)
throws AMQException
{
AMQChannel channel = protocolSession.getChannel(channelId);
@@ -105,6 +101,8 @@
this.consumerTag = consumerTag;
sessionKey = protocolSession.getKey();
_acks = acks;
+ _noLocal = noLocal;
+
_filters = FilterManagerFactory.createManager(filters);
if (_filters != null)
@@ -218,7 +216,22 @@
public boolean hasInterest(AMQMessage msg)
{
- return _filters.allAllow(msg);
+ if (_noLocal)
+ {
+ return
!(protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
+
msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())));
+ }
+ else
+ {
+ if (_filters != null)
+ {
+ return _filters.allAllow(msg);
+ }
+ else
+ {
+ return true;
+ }
+ }
}
public Queue<AMQMessage> getPreDeliveryQueue()
@@ -233,8 +246,6 @@
_messages.offer(msg);
}
}
-
-
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String
routingKey, String exchange)
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
Tue Dec 19 08:07:12 2006
@@ -139,22 +139,15 @@
if (!subscription.isSuspended())
{
- if (!subscription.hasFilters())
+ if (subscription.hasInterest(msg))
{
- return subscription;
- }
- else
- {
- if (subscription.hasInterest(msg))
+ // if the queue is not empty then this client is ready to
receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging
the messages.
+ if (!subscription.hasFilters() ||
subscription.getPreDeliveryQueue().isEmpty())
{
- // if the queue is not empty then this client is ready
to receive a message.
- //FIXME the queue could be full of sent messages.
- // Either need to clean all PDQs after sending a
message
- // OR have a clean up thread that runs the PDQs
expunging the messages.
- if (subscription.getPreDeliveryQueue().isEmpty())
- {
- return subscription;
- }
+ return subscription;
}
}
}
@@ -208,6 +201,7 @@
/**
* Notification that a queue has been deleted. This is called so that the
subscription can inform the
* channel, which in turn can update its list of unacknowledged messages.
+ *
* @param queue
*/
public void queueDeleted(AMQQueue queue)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
Tue Dec 19 08:07:12 2006
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.common.QpidProperties;
+
import java.util.Enumeration;
import javax.jms.ConnectionMetaData;
@@ -29,7 +31,6 @@
{
-
QpidConnectionMetaData(AMQConnection conn)
{
}
@@ -46,7 +47,7 @@
public String getJMSProviderName() throws JMSException
{
- return "Apache Qpid";
+ return "Apache " + QpidProperties.getProductName();
}
public String getJMSVersion() throws JMSException
@@ -71,8 +72,8 @@
public String getProviderVersion() throws JMSException
{
- return "QPID (Client: [" + getClientVersion() + "] ; Broker [" +
getBrokerVersion() + "] ; Protocol: [ "
- + getProtocolVersion() + "] )";
+ return QpidProperties.getProductName() + " (Client: [" +
getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ "
+ + getProtocolVersion() + "] )";
}
private String getProtocolVersion()
@@ -89,8 +90,7 @@
public String getClientVersion()
{
- // TODO - get client build version from properties file or similar
- return "<unknown>";
+ return QpidProperties.getBuildVerision();
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
Tue Dec 19 08:07:12 2006
@@ -22,6 +22,8 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -119,10 +121,11 @@
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.put("instance", ps.getClientID());
- clientProperties.put("product", "Qpid");
- clientProperties.put("version", "1.0");
- clientProperties.put("platform", getFullSystemInfo());
+
+ clientProperties.put(ClientProperties.instance.toString(),
ps.getClientID());
+ clientProperties.put(ClientProperties.product.toString(),
QpidProperties.getProductName());
+ clientProperties.put(ClientProperties.version.toString(),
QpidProperties.getReleaseVerision());
+ clientProperties.put(ClientProperties.platform.toString(),
getFullSystemInfo());
ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
clientProperties, mechanism,
saslResponse,
selectedLocale));
}
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java?view=auto&rev=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
(added)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
Tue Dec 19 08:07:12 2006
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.common;
+
+public enum ClientProperties
+{
+ instance,
+ product,
+ version,
+ platform
+}
Propchange:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java?view=auto&rev=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
(added)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
Tue Dec 19 08:07:12 2006
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.common;
+
+public class QpidProperties
+{
+
+ static
+ {
+ //load values from property file.
+ }
+
+ public static String getProductName()
+ {
+ return "Qpid";
+ }
+
+ public static String getReleaseVerision()
+ {
+ return "1.0";
+ }
+
+
+ public static String getBuildVerision()
+ {
+ return "1";
+ }
+}
Propchange:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/common/QpidProperties.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=488712&r1=488711&r2=488712
==============================================================================
---
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
Tue Dec 19 08:07:12 2006
@@ -22,6 +22,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
@@ -119,6 +120,15 @@
}
public void setSaslServer(SaslServer saslServer)
+ {
+ }
+
+ public FieldTable getClientProperties()
+ {
+ return null;
+ }
+
+ public void setClientProperties(FieldTable clientProperties)
{
}
}