Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java?rev=1630749&r1=1630748&r2=1630749&view=diff ============================================================================== --- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java (original) +++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java Fri Oct 10 09:59:55 2014 @@ -1,2237 +1,2266 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8; - -import java.security.AccessControlException; -import java.util.Collection; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.UUID; - -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; - -import org.apache.log4j.Logger; - -import org.apache.qpid.AMQConnectionException; -import org.apache.qpid.AMQException; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.framing.*; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.filter.AMQInvalidArgumentException; -import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.flow.MessageOnlyCreditManager; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageDestination; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.ExclusivityPolicy; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.NoFactoryForTypeException; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.UnknownConfiguredObjectException; -import org.apache.qpid.server.model.port.AmqpPort; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; -import org.apache.qpid.server.virtualhost.ExchangeExistsException; -import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; -import org.apache.qpid.server.virtualhost.QueueExistsException; -import org.apache.qpid.server.virtualhost.RequiredExchangeException; -import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -public class ServerMethodDispatcherImpl implements MethodDispatcher -{ - private static final Logger _logger = Logger.getLogger(ServerMethodDispatcherImpl.class); - - private final AMQProtocolSession<?> _connection; - - - public static MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection) - { - return new ServerMethodDispatcherImpl(connection); - } - - - public ServerMethodDispatcherImpl(AMQProtocolSession<?> connection) - { - _connection = connection; - } - - - protected final AMQProtocolSession<?> getConnection() - { - return _connection; - } - - public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException - { - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - - if(ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()) ) - { - throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9"); - } - - // We don't implement access control class, but to keep clients happy that expect it - // always use the "0" ticket. - AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0); - channel.sync(); - _connection.writeFrame(response.generateFrame(channelId)); - return true; - } - - public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException - { - - if (_logger.isDebugEnabled()) - { - _logger.debug("Ack(Tag:" + body.getDeliveryTag() + ":Mult:" + body.getMultiple() + ") received on channel " + channelId); - } - - final AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - // this method throws an AMQException if the delivery tag is not known - channel.acknowledgeMessage(body.getDeliveryTag(), body.getMultiple()); - return true; - } - - public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException - { - final AMQChannel channel = _connection.getChannel(channelId); - - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - if (_logger.isDebugEnabled()) - { - _logger.debug("BasicCancel: for:" + body.getConsumerTag() + - " nowait:" + body.getNowait()); - } - - channel.unsubscribeConsumer(body.getConsumerTag()); - if (!body.getNowait()) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag()); - channel.sync(); - _connection.writeFrame(cancelOkBody.generateFrame(channelId)); - } - return true; - } - - public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException - { - AMQChannel channel = _connection.getChannel(channelId); - VirtualHostImpl<?,?,?> vHost = _connection.getVirtualHost(); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - else - { - channel.sync(); - String queueName = body.getQueue() == null ? null : body.getQueue().asString(); - if (_logger.isDebugEnabled()) - { - _logger.debug("BasicConsume: from '" + queueName + - "' for:" + body.getConsumerTag() + - " nowait:" + body.getNowait() + - " args:" + body.getArguments()); - } - - MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName); - final Collection<MessageSource> sources = new HashSet<>(); - if(queue != null) - { - sources.add(queue); - } - else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers") - && body.getArguments() != null - && body.getArguments().get("x-multiqueue") instanceof Collection) - { - for(Object object : (Collection<Object>) body.getArguments().get("x-multiqueue")) - { - String sourceName = String.valueOf(object); - sourceName = sourceName.trim(); - if(sourceName.length() != 0) - { - MessageSource source = vHost.getMessageSource(sourceName); - if(source == null) - { - sources.clear(); - break; - } - else - { - sources.add(source); - } - } - } - queueName = body.getArguments().get("x-multiqueue").toString(); - } - - if (sources.isEmpty()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("No queue for '" + queueName + "'"); - } - if (queueName != null) - { - String msg = "No such queue, '" + queueName + "'"; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry()); - } - else - { - String msg = "No queue name provided, no default queue defined."; - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, _connection.getMethodRegistry()); - } - } - else - { - final AMQShortString consumerTagName; - - if (body.getConsumerTag() != null) - { - consumerTagName = body.getConsumerTag().intern(false); - } - else - { - consumerTagName = null; - } - - try - { - if(consumerTagName == null || channel.getSubscription(consumerTagName) == null) - { - - AMQShortString consumerTag = channel.consumeFromSource(consumerTagName, - sources, - !body.getNoAck(), - body.getArguments(), - body.getExclusive(), - body.getNoLocal()); - if (!body.getNowait()) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - } - } - else - { - AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'"); - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode - msg, // replytext - body.getClazz(), - body.getMethod()); - _connection.writeFrame(responseBody.generateFrame(0)); - } - - } - catch (AMQInvalidArgumentException ise) - { - _logger.debug("Closing connection due to invalid selector"); - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(), - AMQShortString.validValueOf(ise.getMessage()), - body.getClazz(), - body.getMethod()); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - - } - catch (AMQQueue.ExistingExclusiveConsumer e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " as it already has an existing exclusive consumer", - _connection.getMethodRegistry()); - } - catch (AMQQueue.ExistingConsumerPreventsExclusive e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " exclusively as it already has a consumer", - _connection.getMethodRegistry()); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " permission denied", _connection.getMethodRegistry()); - } - catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - "Cannot subscribe to queue " - + queue.getName() - + " as it already has an incompatible exclusivity policy", - _connection.getMethodRegistry()); - } - - } - } - return true; - } - - public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException - { - - VirtualHostImpl vHost = _connection.getVirtualHost(); - - AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - else - { - channel.sync(); - AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString()); - if (queue == null) - { - _logger.info("No queue for '" + body.getQueue() + "'"); - if(body.getQueue()!=null) - { - throw body.getConnectionException(AMQConstant.NOT_FOUND, - "No such queue, '" + body.getQueue() + "'", - _connection.getMethodRegistry()); - } - else - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "No queue name provided, no default queue defined.", - _connection.getMethodRegistry()); - } - } - else - { - - try - { - if (!performGet(queue, _connection, channel, !body.getNoAck())) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - - BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null); - - - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), _connection.getMethodRegistry()); - } - catch (MessageSource.ExistingExclusiveConsumer e) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue has an exclusive consumer", - _connection.getMethodRegistry()); - } - catch (MessageSource.ExistingConsumerPreventsExclusive e) - { - throw body.getConnectionException(AMQConstant.INTERNAL_ERROR, - "The GET request has been evaluated as an exclusive consumer, " + - "this is likely due to a programming error in the Qpid broker", - _connection.getMethodRegistry()); - } - catch (MessageSource.ConsumerAccessRefused consumerAccessRefused) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue has an incompatible exclusivit policy", - _connection.getMethodRegistry()); - } - } - } - return true; - } - - public static boolean performGet(final AMQQueue queue, - final AMQProtocolSession session, - final AMQChannel channel, - final boolean acks) - throws AMQException, MessageSource.ExistingConsumerPreventsExclusive, - MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused - { - - final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); - - final GetDeliveryMethod getDeliveryMethod = - new GetDeliveryMethod(singleMessageCredit, session, channel, queue); - final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() - { - - public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag) - { - channel.addUnacknowledgedMessage(entry, deliveryTag, null); - } - }; - - ConsumerTarget_0_8 target; - EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES, - ConsumerImpl.Option.SEES_REQUEUES); - if(acks) - { - - target = ConsumerTarget_0_8.createAckTarget(channel, - AMQShortString.EMPTY_STRING, null, - singleMessageCredit, getDeliveryMethod, getRecordMethod); - } - else - { - target = ConsumerTarget_0_8.createGetNoAckTarget(channel, - AMQShortString.EMPTY_STRING, null, - singleMessageCredit, getDeliveryMethod, getRecordMethod); - } - - ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options); - sub.flush(); - sub.close(); - return(getDeliveryMethod.hasDeliveredMessage()); - - - } - - - private static class GetDeliveryMethod implements ClientDeliveryMethod - { - - private final FlowCreditManager _singleMessageCredit; - private final AMQProtocolSession _session; - private final AMQChannel _channel; - private final AMQQueue _queue; - private boolean _deliveredMessage; - - public GetDeliveryMethod(final FlowCreditManager singleMessageCredit, - final AMQProtocolSession session, - final AMQChannel channel, final AMQQueue queue) - { - _singleMessageCredit = singleMessageCredit; - _session = session; - _channel = channel; - _queue = queue; - } - - @Override - public long deliverToClient(final ConsumerImpl sub, final ServerMessage message, - final InstanceProperties props, final long deliveryTag) - { - _singleMessageCredit.useCreditForMessage(message.getSize()); - long size =_session.getProtocolOutputConverter().writeGetOk(message, - props, - _channel.getChannelId(), - deliveryTag, - _queue.getQueueDepthMessages()); - - _deliveredMessage = true; - return size; - } - - public boolean hasDeliveredMessage() - { - return _deliveredMessage; - } - } - - public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Publish received on channel " + channelId); - } - - AMQShortString exchangeName = body.getExchange(); - VirtualHostImpl vHost = _connection.getVirtualHost(); - - // TODO: check the delivery tag field details - is it unique across the broker or per subscriber? - - MessageDestination destination; - - if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName)) - { - destination = vHost.getDefaultDestination(); - } - else - { - destination = vHost.getMessageDestination(exchangeName.toString()); - } - - // if the exchange does not exist we raise a channel exception - if (destination == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name", - _connection.getMethodRegistry()); - } - else - { - // The partially populated BasicDeliver frame plus the received route body - // is stored in the channel. Once the final body frame has been received - // it is routed to the exchange. - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - MessagePublishInfo info = new MessagePublishInfo(body.getExchange(), - body.getImmediate(), - body.getMandatory(), - body.getRoutingKey()); - info.setExchange(exchangeName); - try - { - channel.setPublishFrame(info, destination); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - } - return true; - } - - public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException - { - AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - channel.sync(); - channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount()); - - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - return true; - } - - public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException - { - _logger.debug("Recover received on protocol session " + _connection - + " and channel " + channelId); - AMQChannel channel = _connection.getChannel(channelId); - - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - channel.resend(); - - // Qpid 0-8 hacks a synchronous -ok onto recover. - // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant - if(_connection.getProtocolVersion().equals(ProtocolVersion.v8_0)) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody(); - channel.sync(); - _connection.writeFrame(recoverOk.generateFrame(channelId)); - - } - - return true; - } - - public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException - { - - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting:" + body.getDeliveryTag() + - ": Requeue:" + body.getRequeue() + - " on channel:" + channel.debugIdentity()); - } - - long deliveryTag = body.getDeliveryTag(); - - MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag); - - if (message == null) - { - _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag); - } - else - { - - if (message.getMessage() == null) - { - _logger.warn("Message has already been purged, unable to Reject."); - } - else - { - - if (_logger.isDebugEnabled()) - { - _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() + - ": Requeue:" + body.getRequeue() + - " on channel:" + channel.debugIdentity()); - } - - if (body.getRequeue()) - { - //this requeue represents a message rejected from the pre-dispatch queue - //therefore we need to amend the delivery counter. - message.decrementDeliveryCount(); - - channel.requeue(deliveryTag); - } - else - { - // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here - // as it would prevent redelivery - // message.reject(); - - final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag); - _logger.debug("maxDeliveryCountEnabled: " - + maxDeliveryCountEnabled - + " deliveryTag " - + deliveryTag); - if (maxDeliveryCountEnabled) - { - final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag); - _logger.debug("deliveredTooManyTimes: " - + deliveredTooManyTimes - + " deliveryTag " - + deliveryTag); - if (deliveredTooManyTimes) - { - channel.deadLetter(body.getDeliveryTag()); - } - else - { - //this requeue represents a message rejected because of a recover/rollback that we - //are not ready to DLQ. We rely on the reject command to resend from the unacked map - //and therefore need to increment the delivery counter so we cancel out the effect - //of the AMQChannel#resend() decrement. - message.incrementDeliveryCount(); - } - } - else - { - channel.requeue(deliveryTag); - } - } - } - } - return true; - } - - public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException - { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - - // Protect the broker against out of order frame request. - if (virtualHost == null) - { - throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null); - } - _logger.info("Connecting to: " + virtualHost.getName()); - - final AMQChannel channel = new AMQChannel(_connection, channelId, virtualHost.getMessageStore()); - - _connection.addChannel(channel); - - ChannelOpenOkBody response; - - - response = _connection.getMethodRegistry().createChannelOpenOkBody(); - - - _connection.writeFrame(response.generateFrame(channelId)); - return true; - } - - - public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicConsumeOk(BasicConsumeOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicGetEmpty(BasicGetEmptyBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicGetOk(BasicGetOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicQosOk(BasicQosOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException - { - - if (_logger.isInfoEnabled()) - { - _logger.info("Received channel close for id " + channelId - + " citing class " + body.getClassId() + - " and method " + body.getMethodId()); - } - - - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, - "Trying to close unknown channel", - _connection.getMethodRegistry()); - } - channel.sync(); - _connection.closeChannel(channelId); - // Client requested closure so we don't wait for ok we send it - _connection.closeChannelOk(channelId); - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); - return true; - } - - - public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException - { - - _logger.info("Received channel-close-ok for channel-id " + channelId); - - // Let the Protocol Session know the channel is now closed. - _connection.closeChannelOk(channelId); - return true; - } - - - public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException - { - final AMQProtocolSession<?> connection = getConnection(); - - - AMQChannel channel = connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry()); - } - channel.sync(); - channel.setSuspended(!body.getActive()); - _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive()); - - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive()); - connection.writeFrame(responseBody.generateFrame(channelId)); - return true; - } - - public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchChannelOpenOk(ChannelOpenOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException - { - - //ignore leading '/' - String virtualHostName; - if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/') - { - virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString(); - } - else - { - virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost()); - } - - VirtualHostImpl virtualHost = ((AmqpPort) _connection.getPort()).getVirtualHost(virtualHostName); - - if (virtualHost == null) - { - throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'", - _connection.getMethodRegistry()); - } - else - { - // Check virtualhost access - if (virtualHost.getState() != State.ACTIVE) - { - throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, - "Virtual host '" + virtualHost.getName() + "' is not active", - _connection.getMethodRegistry()); - } - - _connection.setVirtualHost(virtualHost); - try - { - virtualHost.getSecurityManager().authoriseCreateConnection(_connection); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - - // See Spec (0.8.2). Section 3.1.2 Virtual Hosts - if (_connection.getContextKey() == null) - { - _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis()))); - } - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost()); - - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - return true; - } - - - public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException - { - if (_logger.isInfoEnabled()) - { - _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" + - body.getReplyText() + " for " + _connection); - } - try - { - _connection.closeSession(); - } - catch (Exception e) - { - _logger.error("Error closing protocol session: " + e, e); - } - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - _connection.closeProtocolSession(); - - return true; - } - - - public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException - { - _logger.info("Received Connection-close-ok"); - - try - { - _connection.closeSession(); - } - catch (Exception e) - { - _logger.error("Error closing protocol session: " + e, e); - } - return true; - } - - public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchExchangeDeclareOk(ExchangeDeclareOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchExchangeDeleteOk(ExchangeDeleteOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueBindOk(QueueBindOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueDeclareOk(QueueDeclareOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchQueuePurgeOk(QueuePurgeOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchTxCommitOk(TxCommitOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchTxRollbackOk(TxRollbackOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException - { - throw new UnexpectedMethodException(body); - } - - - public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException - { - Broker<?> broker = _connection.getBroker(); - - SubjectCreator subjectCreator = _connection.getSubjectCreator(); - - SaslServer ss = _connection.getSaslServer(); - if (ss == null) - { - throw new AMQException("No SASL context set up in session"); - } - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); - switch (authResult.getStatus()) - { - case ERROR: - Exception cause = authResult.getCause(); - - _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - - ConnectionCloseBody connectionCloseBody = - methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), - AMQConstant.NOT_ALLOWED.getName(), - body.getClazz(), - body.getMethod()); - - _connection.writeFrame(connectionCloseBody.generateFrame(0)); - disposeSaslServer(_connection); - break; - case SUCCESS: - if (_logger.isInfoEnabled()) - { - _logger.info("Connected as: " + authResult.getSubject()); - } - - int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - - if(frameMax <= 0) - { - frameMax = Integer.MAX_VALUE; - } - - ConnectionTuneBody tuneBody = - methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - frameMax, - broker.getConnection_heartBeatDelay()); - _connection.writeFrame(tuneBody.generateFrame(0)); - _connection.setAuthorizedSubject(authResult.getSubject()); - disposeSaslServer(_connection); - break; - case CONTINUE: - - ConnectionSecureBody - secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - _connection.writeFrame(secureBody.generateFrame(0)); - } - return true; - } - - private void disposeSaslServer(AMQProtocolSession ps) - { - SaslServer ss = ps.getSaslServer(); - if (ss != null) - { - ps.setSaslServer(null); - try - { - ss.dispose(); - } - catch (SaslException e) - { - _logger.error("Error disposing of Sasl server: " + e); - } - } - } - - public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException - { - Broker<?> broker = _connection.getBroker(); - - _logger.info("SASL Mechanism selected: " + body.getMechanism()); - _logger.info("Locale selected: " + body.getLocale()); - - SubjectCreator subjectCreator = _connection.getSubjectCreator(); - SaslServer ss = null; - try - { - ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()), - _connection.getLocalFQDN(), - _connection.getPeerPrincipal()); - - if (ss == null) - { - throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, - "Unable to create SASL Server:" + body.getMechanism(), - _connection.getMethodRegistry()); - } - - _connection.setSaslServer(ss); - - final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse()); - //save clientProperties - _connection.setClientProperties(body.getClientProperties()); - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - - switch (authResult.getStatus()) - { - case ERROR: - Exception cause = authResult.getCause(); - - _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage())); - - ConnectionCloseBody closeBody = - methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode - AMQConstant.NOT_ALLOWED.getName(), - body.getClazz(), - body.getMethod()); - - _connection.writeFrame(closeBody.generateFrame(0)); - disposeSaslServer(_connection); - break; - - case SUCCESS: - if (_logger.isInfoEnabled()) - { - _logger.info("Connected as: " + authResult.getSubject()); - } - _connection.setAuthorizedSubject(authResult.getSubject()); - - int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - - if(frameMax <= 0) - { - frameMax = Integer.MAX_VALUE; - } - - ConnectionTuneBody - tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), - frameMax, - broker.getConnection_heartBeatDelay()); - _connection.writeFrame(tuneBody.generateFrame(0)); - break; - case CONTINUE: - ConnectionSecureBody - secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge()); - _connection.writeFrame(secureBody.generateFrame(0)); - } - } - catch (SaslException e) - { - disposeSaslServer(_connection); - throw new AMQException("SASL error: " + e, e); - } - return true; - } - - public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException - { - final AMQProtocolSession<?> connection = getConnection(); - - if (_logger.isDebugEnabled()) - { - _logger.debug(body); - } - - connection.initHeartbeats(body.getHeartbeat()); - - int brokerFrameMax = connection.getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE); - if(brokerFrameMax <= 0) - { - brokerFrameMax = Integer.MAX_VALUE; - } - - if(body.getFrameMax() > (long) brokerFrameMax) - { - throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, - "Attempt to set max frame size to " + body.getFrameMax() - + " greater than the broker will allow: " - + brokerFrameMax, - body.getClazz(), body.getMethod(), - connection.getMethodRegistry(),null); - } - else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode()) - { - throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR, - "Attempt to set max frame size to " + body.getFrameMax() - + " which is smaller than the specification definined minimum: " - + AMQConstant.FRAME_MIN_SIZE.getCode(), - body.getClazz(), body.getMethod(), - connection.getMethodRegistry(),null); - } - int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax(); - connection.setMaxFrameSize(frameMax); - - long maxChannelNumber = body.getChannelMax(); - //0 means no implied limit, except that forced by protocol limitations (0xFFFF) - connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber); - return true; - } - - public static final int OK = 0; - public static final int EXCHANGE_NOT_FOUND = 1; - public static final int QUEUE_NOT_FOUND = 2; - public static final int NO_BINDINGS = 3; - public static final int QUEUE_NOT_BOUND = 4; - public static final int NO_QUEUE_BOUND_WITH_RK = 5; - public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6; - - public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException - { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - channel.sync(); - - - AMQShortString exchangeName = body.getExchange(); - AMQShortString queueName = body.getQueue(); - AMQShortString routingKey = body.getRoutingKey(); - ExchangeBoundOkBody response; - - if(isDefaultExchange(exchangeName)) - { - if(routingKey == null) - { - if(queueName == null) - { - response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null); - } - else - { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - response = methodRegistry.createExchangeBoundOkBody(OK, null); - } - } - } - else - { - if(queueName == null) - { - response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null); - } - else - { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null); - } - } - } - } - else - { - ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString()); - if (exchange == null) - { - - - response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND, - AMQShortString.validValueOf("Exchange '" + exchangeName + "' not found")); - } - else if (routingKey == null) - { - if (queueName == null) - { - if (exchange.hasBindings()) - { - response = methodRegistry.createExchangeBoundOkBody(OK, null); - } - else - { - - response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode - null); // replyText - } - } - else - { - - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - if (exchange.isBound(queue)) - { - - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText - } - else - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText - } - } - } - } - else if (queueName != null) - { - AMQQueue queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - - response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode - AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText - } - else - { - String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString(); - if (exchange.isBound(bindingKey, queue)) - { - - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText - } - else - { - - String message = "Queue '" + queueName + "' not bound with routing key '" + - body.getRoutingKey() + "' to exchange '" + exchangeName + "'"; - - response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode - AMQShortString.validValueOf(message)); // replyText - } - } - } - else - { - if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString())) - { - - response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode - null); // replyText - } - else - { - - response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode - AMQShortString.validValueOf("No queue bound with routing key '" + body.getRoutingKey() + - "' to exchange '" + exchangeName + "'")); // replyText - } - } - } - _connection.writeFrame(response.generateFrame(channelId)); - return true; - } - - public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException - { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - final AMQShortString exchangeName = body.getExchange(); - if (_logger.isDebugEnabled()) - { - _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName); - } - - ExchangeImpl exchange; - - if(isDefaultExchange(exchangeName)) - { - if(!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(body.getType())) - { - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: " - + " of type " - + ExchangeDefaults.DIRECT_EXCHANGE_CLASS - + " to " + body.getType() +".", - body.getClazz(), body.getMethod(), - _connection.getMethodRegistry(),null); - } - } - else - { - if (body.getPassive()) - { - exchange = virtualHost.getExchange(exchangeName.toString()); - if(exchange == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName, - _connection.getMethodRegistry()); - } - else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString())) - { - - throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + - exchangeName + " of type " + exchange.getType() - + " to " + body.getType() +".", - body.getClazz(), body.getMethod(), - _connection.getMethodRegistry(),null); - } - - } - else - { - try - { - String name = exchangeName == null ? null : exchangeName.intern().toString(); - String type = body.getType() == null ? null : body.getType().intern().toString(); - - Map<String,Object> attributes = new HashMap<String, Object>(); - if(body.getArguments() != null) - { - attributes.putAll(FieldTable.convertToMap(body.getArguments())); - } - attributes.put(org.apache.qpid.server.model.Exchange.ID, null); - attributes.put(org.apache.qpid.server.model.Exchange.NAME,name); - attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type); - attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable()); - attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, - body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE)) - { - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); - } - exchange = virtualHost.createExchange(attributes); - - } - catch(ReservedExchangeNameException e) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix.", - _connection.getMethodRegistry()); - - } - catch(ExchangeExistsException e) - { - exchange = e.getExistingExchange(); - if(!new AMQShortString(exchange.getType()).equals(body.getType())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " - + exchangeName + " of type " - + exchange.getType() - + " to " + body.getType() + ".", - _connection.getMethodRegistry()); - } - } - catch(NoFactoryForTypeException e) - { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, - "Unknown exchange type '" - + e.getType() - + "' for exchange '" - + exchangeName - + "'", - _connection.getMethodRegistry()); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - catch (UnknownConfiguredObjectException e) - { - // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur - throw body.getConnectionException(AMQConstant.NOT_FOUND, - "Unknown alternate exchange " - + (e.getName() != null - ? "name: \"" + e.getName() + "\"" - : "id: " + e.getId()), - _connection.getMethodRegistry()); - } - catch (IllegalArgumentException e) - { - throw body.getConnectionException(AMQConstant.COMMAND_INVALID, - "Error creating exchange '" - + exchangeName - + "': " - + e.getMessage(), - _connection.getMethodRegistry()); - } - } - } - - if(!body.getNowait()) - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody(); - channel.sync(); - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - return true; - } - - public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException - { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - final AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - channel.sync(); - try - { - - if(isDefaultExchange(body.getExchange())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Default Exchange cannot be deleted", - _connection.getMethodRegistry()); - } - - final String exchangeName = body.getExchange().toString(); - - final ExchangeImpl exchange = virtualHost.getExchange(exchangeName); - if(exchange == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(), - _connection.getMethodRegistry()); - } - - virtualHost.removeExchange(exchange, !body.getIfUnused()); - - ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody(); - - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - - catch (ExchangeIsAlternateException e) - { - throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange", - _connection.getMethodRegistry()); - - } - catch (RequiredExchangeException e) - { - throw body.getChannelException(AMQConstant.NOT_ALLOWED, - "Exchange '" + body.getExchange() + "' cannot be deleted", - _connection.getMethodRegistry()); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - return true; - } - - private boolean isDefaultExchange(final AMQShortString exchangeName) - { - return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING); - } - - public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException - { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - final AMQQueue queue; - final AMQShortString routingKey; - - final AMQShortString queueName = body.getQueue(); - - if (queueName == null) - { - - queue = channel.getDefaultQueue(); - - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, - "No default queue defined on channel and queue was null", - _connection.getMethodRegistry()); - } - - if (body.getRoutingKey() == null) - { - routingKey = AMQShortString.valueOf(queue.getName()); - } - else - { - routingKey = body.getRoutingKey().intern(); - } - } - else - { - queue = virtualHost.getQueue(queueName.toString()); - routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern(); - } - - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.", - _connection.getMethodRegistry()); - } - - if(isDefaultExchange(body.getExchange())) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Cannot bind the queue " + queueName + " to the default exchange", - _connection.getMethodRegistry()); - } - - final String exchangeName = body.getExchange().toString(); - - final ExchangeImpl exch = virtualHost.getExchange(exchangeName); - if (exch == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.", - _connection.getMethodRegistry()); - } - - - try - { - - Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments()); - String bindingKey = String.valueOf(routingKey); - - if (!exch.isBound(bindingKey, arguments, queue)) - { - - if(!exch.addBinding(bindingKey, queue, arguments) && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exch.getType())) - { - exch.replaceBinding(bindingKey, queue, arguments); - } - } - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - - if (_logger.isInfoEnabled()) - { - _logger.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); - } - if (!body.getNowait()) - { - channel.sync(); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - } - return true; - } - - public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException - { - final AMQSessionModel session = _connection.getChannel(channelId); - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - - final AMQShortString queueName; - - // if we aren't given a queue name, we create one which we return to the client - if ((body.getQueue() == null) || (body.getQueue().length() == 0)) - { - queueName = new AMQShortString("tmp_" + UUID.randomUUID()); - } - else - { - queueName = body.getQueue().intern(); - } - - AMQQueue queue; - - //TODO: do we need to check that the queue already exists with exactly the same "configuration"? - - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - - if(body.getPassive()) - { - queue = virtualHost.getQueue(queueName.toString()); - if (queue == null) - { - String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ")."; - throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry()); - } - else - { - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " - + queue.getName() - + " is exclusive, but not created on this Connection.", - _connection.getMethodRegistry()); - } - - //set this as the default queue on the channel: - channel.setDefaultQueue(queue); - } - } - else - { - - try - { - - queue = createQueue(channel, queueName, body, virtualHost, _connection); - - } - catch(QueueExistsException qe) - { - - queue = qe.getExistingQueue(); - - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " - + queue.getName() - + " is exclusive, but not created on this Connection.", - _connection.getMethodRegistry()); - } - else if(queue.isExclusive() != body.getExclusive()) - { - - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" - + queue.getName() - + "' with different exclusivity (was: " - + queue.isExclusive() - + " requested " - + body.getExclusive() - + ")", - _connection.getMethodRegistry()); - } - else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS) - || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT))) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" - + queue.getName() - + "' with different lifetime policy (was: " - + queue.getLifetimePolicy() - + " requested autodelete: " - + body.getAutoDelete() - + ")", - _connection.getMethodRegistry()); - } - else if(queue.isDurable() != body.getDurable()) - { - throw body.getChannelException(AMQConstant.ALREADY_EXISTS, - "Cannot re-declare queue '" - + queue.getName() - + "' with different durability (was: " - + queue.isDurable() - + " requested " - + body.getDurable() - + ")", - _connection.getMethodRegistry()); - } - - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - - //set this as the default queue on the channel: - channel.setDefaultQueue(queue); - } - - if (!body.getNowait()) - { - channel.sync(); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - QueueDeclareOkBody responseBody = - methodRegistry.createQueueDeclareOkBody(queueName, - queue.getQueueDepthMessages(), - queue.getConsumerCount()); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - _logger.info("Queue " + queueName + " declared successfully"); - } - return true; - } - - protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName, - QueueDeclareBody body, - final VirtualHostImpl virtualHost, - final AMQProtocolSession session) - throws AMQException, QueueExistsException - { - - final boolean durable = body.getDurable(); - final boolean autoDelete = body.getAutoDelete(); - final boolean exclusive = body.getExclusive(); - - - Map<String, Object> attributes = - QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments())); - final String queueNameString = AMQShortString.toString(queueName); - attributes.put(Queue.NAME, queueNameString); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.DURABLE, durable); - - LifetimePolicy lifetimePolicy; - ExclusivityPolicy exclusivityPolicy; - - if(exclusive) - { - lifetimePolicy = autoDelete - ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS - : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE; - exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION; - } - else - { - lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT; - exclusivityPolicy = ExclusivityPolicy.NONE; - } - - attributes.put(Queue.EXCLUSIVE, exclusivityPolicy); - attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy); - - - final AMQQueue queue = virtualHost.createQueue(attributes); - - return queue; - } - - public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException - { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - channel.sync(); - AMQQueue queue; - if (body.getQueue() == null) - { - - //get the default queue on the channel: - queue = channel.getDefaultQueue(); - } - else - { - queue = virtualHost.getQueue(body.getQueue().toString()); - } - - if (queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", - _connection.getMethodRegistry()); - - } - else - { - if (body.getIfEmpty() && !queue.isEmpty()) - { - throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.", - _connection.getMethodRegistry()); - } - else if (body.getIfUnused() && !queue.isUnused()) - { - // TODO - Error code - throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.", - _connection.getMethodRegistry()); - } - else - { - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue " - + queue.getName() - + " is exclusive, but not created on this Connection.", - _connection.getMethodRegistry()); - } - - int purged = 0; - try - { - purged = virtualHost.removeQueue(queue); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged); - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - } - return true; - } - - public boolean dispatchQueuePurge(QueuePurgeBody body, int channelId) throws AMQException - { - VirtualHostImpl virtualHost = _connection.getVirtualHost(); - - AMQChannel channel = _connection.getChannel(channelId); - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - AMQQueue queue; - if(body.getQueue() == null) - { - - //get the default queue on the channel: - queue = channel.getDefaultQueue(); - - if(queue == null) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "No queue specified.", - _connection.getMethodRegistry()); - } - } - else - { - queue = virtualHost.getQueue(body.getQueue().toString()); - } - - if(queue == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.", - _connection.getMethodRegistry()); - } - else - { - if (!queue.verifySessionAccess(channel)) - { - throw body.getConnectionException(AMQConstant.NOT_ALLOWED, - "Queue is exclusive, but not created on this Connection.", - _connection.getMethodRegistry()); - } - - long purged = 0; - try - { - purged = queue.clearQueue(); - } - catch (AccessControlException e) - { - throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, - e.getMessage(), - _connection.getMethodRegistry()); - } - - - if(!body.getNowait()) - { - channel.sync(); - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged); - _connection.writeFrame(responseBody.generateFrame(channelId)); - - } - } - return true; - } - - - public boolean dispatchTxCommit(TxCommitBody body, final int channelId) throws AMQException - { - try - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Commit received on channel " + channelId); - } - AMQChannel channel = _connection.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry()); - } - channel.commit(new Runnable() - { - - @Override - public void run() - { - MethodRegistry methodRegistry = _connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody(); - _connection.writeFrame(responseBody.generateFrame(channelId)); - } - }, true); - - - - } - catch (AMQException e) - { - throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage(), - _connection.getMethodRegistry()); - } - return true; - } - - public boolean dispatchTxRollback(TxRollbackBody body, final int channelId) throws AMQException - { - try - {
[... 2456 lines stripped ...] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org