Author: gsim
Date: Mon Jan 29 10:53:27 2007
New Revision: 501144
URL: http://svn.apache.org/viewvc?view=rev&rev=501144
Log:
Moved across fixes from trunk for handling exclusive consumers and no_local
consumption.
Fixed close process in AMQChannel (remove channel from map only after consumer
cancellations have been processed).
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java?view=diff&rev=501144&r1=501143&r2=501144
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
Mon Jan 29 10:53:27 2007
@@ -105,6 +105,21 @@
session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
"Non-unique consumer tag, '" + body.destination + "'",
body.getClazz(), body.getMethod());
}
+ catch (AMQQueue.ExistingExclusiveSubscription e)
+ {
+ throw
body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already
has an existing exclusive consumer");
+ }
+ catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+ {
+ throw
body.getChannelException(AMQConstant.ACCESS_REFUSED.getCode(),
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " exclusively as it
already has a consumer");
+ }
+
}
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java?view=diff&rev=501144&r1=501143&r2=501144
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageRecoverHandler.java
Mon Jan 29 10:53:27 2007
@@ -24,6 +24,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.ExchangeRegistry;
@@ -61,6 +62,8 @@
} else {
channel.resend(protocolSession);
}
+ MessageOkBody response =
MessageOkBody.createMethodBody(protocolSession.getMajor(),
protocolSession.getMinor());
+ protocolSession.writeResponse(evt, response);
}
}
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=501144&r1=501143&r2=501144
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Mon Jan 29 10:53:27 2007
@@ -316,9 +316,10 @@
{
_logger.error("Closing channel due to: " + e.getMessage());
writeRequest(channelNum, e.getCloseMethodBody());
- AMQChannel channel = _channelMap.remove(channelNum);
+ AMQChannel channel = _channelMap.get(channelNum);//can't remove it
yet as close requires it
if (channel != null) {
channel.close(this);
+ _channelMap.remove(channelNum);
}
}
catch (AMQConnectionException e)
@@ -726,6 +727,11 @@
public int getConnectionId()
{
return _ConnectionId.get();
+ }
+
+ public Object getClientIdentifier()
+ {
+ return _minaProtocolSession.getRemoteAddress();
}
public void addSessionCloseTask(Task task)
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=501144&r1=501143&r2=501144
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
Mon Jan 29 10:53:27 2007
@@ -159,6 +159,8 @@
void checkMethodBodyVersion(AMQMethodBody methodBody);
int getConnectionId();
+ Object getClientIdentifier();
+
void addSessionCloseTask(Task task);
void removeSessionCloseTask(Task task);
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=501144&r1=501143&r2=501144
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Mon Jan 29 10:53:27 2007
@@ -72,6 +72,7 @@
private final boolean _isBrowser;
private final Boolean _autoClose;
private boolean _closed = false;
+ private static final String CLIENT_PROPERTIES_INSTANCE =
ClientProperties.instance.toString();
public static class Factory implements SubscriptionFactory
{
@@ -331,35 +332,49 @@
{
if (_noLocal)
{
+ boolean isLocal;
// We don't want local messages so check to see if message is one
we sent
- if
(protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
-
msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())))
+ Object localInstance;
+ Object msgInstance;
+
+ if((protocolSession.getClientProperties() != null) &&
+ (localInstance =
protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) !=
null)
{
- if (_logger.isTraceEnabled())
+ if((msg.getPublisher().getClientProperties() != null) &&
+ (msgInstance =
msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE))
!= null)
{
- _logger.trace("(" + System.identityHashCode(this) + ") has
no interest as it is a local message(" +
- System.identityHashCode(msg) + ")");
+ if (localInstance == msgInstance || ((localInstance !=
null) && localInstance.equals(msgInstance)))
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this)
+ ") has no interest as it is a local message(" +
+ System.identityHashCode(msg) + ")");
+ }
+ return false;
+ }
}
- return false;
}
- else // if not then filter the message.
+ else
{
- if (_logger.isTraceEnabled())
+ localInstance = protocolSession.getClientIdentifier();
+ msgInstance = msg.getPublisher().getClientIdentifier();
+ if (localInstance == msgInstance || ((localInstance != null)
&& localInstance.equals(msgInstance)))
{
- _logger.trace("(" + System.identityHashCode(this) + ")
local message(" + System.identityHashCode(msg) +
- ") but not ours so filtering");
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ")
has no interest as it is a local message(" +
+ System.identityHashCode(msg) + ")");
+ }
+ return false;
}
- return checkFilters(msg);
+
}
}
- else
+ if (_logger.isTraceEnabled())
{
- if (_logger.isTraceEnabled())
- {
- _logger.trace("(" + System.identityHashCode(this) + ")
checking filters for message (" + System.identityHashCode(msg));
- }
- return checkFilters(msg);
+ _logger.trace("(" + System.identityHashCode(this) + ") checking
filters for message (" + System.identityHashCode(msg));
}
+ return checkFilters(msg);
}
private boolean checkFilters(AMQMessage msg)
Modified:
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=501144&r1=501143&r2=501144
==============================================================================
---
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
(original)
+++
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
Mon Jan 29 10:53:27 2007
@@ -143,6 +143,12 @@
{
}
+
+ public Object getClientIdentifier()
+ {
+ return null;
+ }
+
public void closeChannelRequest(int channelId, int replyCode, String
replyText) throws AMQException {
// TODO Auto-generated method stub