This is an automated email from the ASF dual-hosted git repository. vavrtom pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push: new 1bc1974012 QPID-8618: [Broker-J] ACL check on link stealing (#169) 1bc1974012 is described below commit 1bc1974012e11204dea12f3f675785bb61797537 Author: Daniil Kirilyuk <daniel.kiril...@gmail.com> AuthorDate: Fri Feb 3 08:28:08 2023 +0100 QPID-8618: [Broker-J] ACL check on link stealing (#169) --- .../apache/qpid/server/protocol/v1_0/LinkImpl.java | 16 +- .../qpid/server/protocol/v1_0/LinkImplTest.java | 235 +++++++++++++++++++++ 2 files changed, 250 insertions(+), 1 deletion(-) diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java index 692faa6f97..cabcf9b133 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol.v1_0; import java.util.Iterator; import java.util.LinkedList; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; @@ -34,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.protocol.LinkModel; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; import org.apache.qpid.server.protocol.v1_0.type.BaseSource; @@ -47,6 +49,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Error; import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.util.Action; public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Link_1_0<S, T> @@ -94,7 +97,18 @@ public class LinkImpl<S extends BaseSource, T extends BaseTarget> implements Lin if (_linkEndpoint != null && !session.equals(_linkEndpoint.getSession())) { - SettableFuture<LinkEndpoint<S, T>> future = SettableFuture.create(); + if (!Objects.equals(_linkEndpoint.getSession().getConnection().getPrincipal(), + session.getConnection().getPrincipal())) + { + final Operation operation = attach.getRole() == Role.SENDER + ? Operation.PERFORM_ACTION("publish") + : Operation.PERFORM_ACTION("consume"); + final ConfiguredObject<?> targetObject = _linkEndpoint instanceof SendingLinkEndpoint + ? (ConfiguredObject<?>) ((SendingLinkEndpoint) _linkEndpoint).getDestination().getMessageSource() + : (ConfiguredObject<?>) session.getReceivingDestination(this, (Target) getTarget()).getMessageDestination(); + targetObject.authorise(operation); + } + final SettableFuture<LinkEndpoint<S, T>> future = SettableFuture.create(); _thiefQueue.add(new ThiefInformation(session, attach, future)); startLinkStealingIfNecessary(); return future; diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkImplTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkImplTest.java new file mode 100644 index 0000000000..96cd1d3db7 --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkImplTest.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.protocol.v1_0; + +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageInstanceConsumer; +import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.NamedAddressSpace; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry; +import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; +import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; +import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.server.security.access.Operation; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.transport.AMQPConnection; + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class LinkImplTest +{ + private final static String REMOTE_CONTAINER_ID = "remote-container-id"; + private final static String LINK_NAME = "link-name"; + + private LinkRegistry<Source, Target> _linkRegistry; + + @Before + public void setUp() + { + _linkRegistry = mock(LinkRegistry.class); + } + + @Test + public void linkStealing_PublishToQueue() throws Exception + { + final Queue<?> queue = mock(Queue.class); + final Session_1_0 session1 = createSession("principal1", queue); + final Session_1_0 session2 = createSession("principal2", queue); + + final Attach attach1 = createAttach(Role.SENDER); + final Attach attach2 = createAttach(Role.SENDER); + + final LinkImpl<Source, Target> link = new LinkImpl<>(REMOTE_CONTAINER_ID, LINK_NAME, Role.RECEIVER, _linkRegistry); + + final ListenableFuture<? extends LinkEndpoint<?, ?>> future = link.attach(session1, attach1); + final LinkEndpoint<?, ?> linkEndpoint = future.get(); + assertTrue(linkEndpoint instanceof StandardReceivingLinkEndpoint); + + link.attach(session2, attach2); + verify(queue, times(1)).authorise(eq(Operation.PERFORM_ACTION("publish"))); + } + + @Test + public void linkStealing_ConsumeQueue() throws Exception + { + final Queue<?> queue = mock(Queue.class); + final Session_1_0 session1 = createSession("principal1", queue); + final Session_1_0 session2 = createSession("principal2", queue); + + final Attach attach1 = createAttach(Role.RECEIVER); + final Attach attach2 = createAttach(Role.RECEIVER); + + final LinkImpl<Source, Target> link = new LinkImpl<>(REMOTE_CONTAINER_ID, LINK_NAME, Role.SENDER, _linkRegistry); + + final ListenableFuture<? extends LinkEndpoint<?, ?>> future = link.attach(session1, attach1); + final LinkEndpoint<?, ?> linkEndpoint = future.get(); + assertTrue(linkEndpoint instanceof SendingLinkEndpoint); + + link.attach(session2, attach2); + verify(queue, times(1)).authorise(eq(Operation.PERFORM_ACTION("consume"))); + } + + @Test + public void linkStealing_PublishToExchange() throws Exception + { + final Exchange<?> exchange = mock(Exchange.class); + final Queue<?> queue = mock(Queue.class); + final Session_1_0 session1 = createSession("principal1", exchange, queue); + final Session_1_0 session2 = createSession("principal2", exchange, queue); + + final Attach attach1 = createAttach(Role.SENDER); + final Attach attach2 = createAttach(Role.SENDER); + + final LinkImpl<Source, Target> link = new LinkImpl<>(REMOTE_CONTAINER_ID, LINK_NAME, Role.RECEIVER, _linkRegistry); + + final ListenableFuture<? extends LinkEndpoint<?, ?>> future = link.attach(session1, attach1); + final LinkEndpoint<?, ?> linkEndpoint = future.get(); + assertTrue(linkEndpoint instanceof StandardReceivingLinkEndpoint); + + link.attach(session2, attach2); + verify(exchange, times(1)).authorise(eq(Operation.PERFORM_ACTION("publish"))); + } + + @Test + public void linkStealing_ConsumeExchange() throws Exception + { + final Exchange<?> exchange = mock(Exchange.class); + final Queue<?> queue = mock(Queue.class); + final Session_1_0 session1 = createSession("principal1", exchange, queue); + final Session_1_0 session2 = createSession("principal2", exchange, queue); + + final Attach attach1 = createAttach(Role.RECEIVER); + final Attach attach2 = createAttach(Role.RECEIVER); + + final LinkImpl<Source, Target> link = new LinkImpl<>(REMOTE_CONTAINER_ID, LINK_NAME, Role.SENDER, _linkRegistry); + + final ListenableFuture<? extends LinkEndpoint<?, ?>> future = link.attach(session1, attach1); + final LinkEndpoint<?, ?> linkEndpoint = future.get(); + assertTrue(linkEndpoint instanceof SendingLinkEndpoint); + + link.attach(session2, attach2); + verify(queue, times(1)).authorise(eq(Operation.PERFORM_ACTION("consume"))); + } + + private Session_1_0 createSession(final String principal, + final MessageDestination messageDestination) throws Exception + { + return createSession(principal, messageDestination, null); + } + + private Session_1_0 createSession(final String principal, + final MessageDestination messageDestination, + final MessageDestination backup) throws Exception + { + final MessageStore messageStore = mock(MessageStore.class); + + final NamedAddressSpace addressSpace = mock(NamedAddressSpace.class); + when(addressSpace.getMessageStore()).thenReturn(messageStore); + + final SectionDecoderRegistry sectionDecoderRegistry = mock(SectionDecoderRegistry.class); + + final AMQPDescribedTypeRegistry amqpDescribedTypeRegistry = mock(AMQPDescribedTypeRegistry.class); + when(amqpDescribedTypeRegistry.getSectionDecoderRegistry()).thenReturn(sectionDecoderRegistry); + + final AMQPConnection<?> amqpConnection = mock(AMQPConnection.class); + when(amqpConnection.getContextValue(eq(Long.class), eq(Consumer.SUSPEND_NOTIFICATION_PERIOD))) + .thenReturn(10_000L); + + final AMQPConnection_1_0<?> connection = mock(AMQPConnection_1_0.class); + when(connection.getDescribedTypeRegistry()).thenReturn(amqpDescribedTypeRegistry); + when(connection.getPrincipal()).thenReturn(principal); + when(connection.getAddressSpace()).thenReturn(addressSpace); + + final DeliveryRegistry deliveryRegistry = mock(DeliveryRegistry.class); + + final ReceivingDestination receivingDestination = mock(ReceivingDestination.class); + when(receivingDestination.getCapabilities()).thenReturn(new Symbol[] { }); + when(receivingDestination.getMessageDestination()).thenReturn(messageDestination); + + final SendingDestination sendingDestination = messageDestination instanceof Exchange + ? mock(ExchangeSendingDestination.class) + : mock(StandardSendingDestination.class); + when(sendingDestination.getCapabilities()).thenReturn(new Symbol[] { }); + + final MessageInstanceConsumer consumer = mock(MessageInstanceConsumer.class); + + if (messageDestination instanceof Exchange) + { + final Queue<?> queue = (Queue<?>) backup; + when(queue.addConsumer(any(),any(), eq(Message_1_0.class), any(), any(), any())) + .thenReturn(consumer); + + when(sendingDestination.getMessageSource()).thenReturn(queue); + } + + if (messageDestination instanceof Queue) + { + final Queue<?> queue = (Queue<?>) messageDestination; + when(queue.addConsumer(any(),any(), eq(Message_1_0.class), any(), any(), any())) + .thenReturn(consumer); + when(sendingDestination.getMessageSource()).thenReturn(queue); + } + + final Session_1_0 session = mock(Session_1_0.class); + doReturn(connection).when(session).getConnection(); + doReturn(amqpConnection).when(session).getAMQPConnection(); + when(session.getIncomingDeliveryRegistry()).thenReturn(deliveryRegistry); + when(session.getOutgoingDeliveryRegistry()).thenReturn(deliveryRegistry); + when(session.getReceivingDestination(any(LinkImpl.class), any(Target.class))).thenReturn(receivingDestination); + when(session.getSendingDestination(any(LinkImpl.class), any(Source.class))).thenReturn(sendingDestination); + + return session; + } + + private Attach createAttach(final Role role) + { + final Source source = mock(Source.class); + when(source.getDistributionMode()).thenReturn(StdDistMode.COPY); + + final Target target = mock(Target.class); + + final Attach attach = mock(Attach.class); + when(attach.getRole()).thenReturn(role); + when(attach.getSource()).thenReturn(source); + when(attach.getTarget()).thenReturn(target); + when(attach.getInitialDeliveryCount()).thenReturn(UnsignedInteger.ZERO); + + return attach; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org