This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 69a600e86bb [improve][admin] Check if the topic existed before the permission operations (#22547) 69a600e86bb is described below commit 69a600e86bb5110a118d836125411e941b83764d Author: Jiwei Guo <techno...@apache.org> AuthorDate: Fri Apr 26 14:05:30 2024 +0800 [improve][admin] Check if the topic existed before the permission operations (#22547) --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 9 ++++++--- .../pulsar/broker/admin/AdminApiSchemaWithAuthTest.java | 1 + .../java/org/apache/pulsar/broker/admin/AdminApiTest.java | 12 ++++++++++++ .../apache/pulsar/broker/admin/PersistentTopicsTest.java | 10 ++++++++-- .../org/apache/pulsar/broker/auth/AuthorizationTest.java | 13 ++++++++----- .../client/api/AuthenticatedProducerConsumerTest.java | 4 +++- .../client/api/AuthorizationProducerConsumerTest.java | 2 ++ .../pulsar/websocket/proxy/ProxyAuthorizationTest.java | 8 +++++--- 8 files changed, 45 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 63ea987bb07..682f41dcdb6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -205,6 +205,7 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissionsOnTopic() { // This operation should be reading from zookeeper and it should be allowed without having admin privileges return validateAdminAccessForTenantAsync(namespaceName.getTenant()) + .thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getAuthorizationService().getPermissionsAsync(topicName)); } @@ -256,9 +257,10 @@ public class PersistentTopicsBase extends AdminResource { Set<AuthAction> actions) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) - .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 -> - grantPermissionsAsync(topicName, role, actions) - .thenAccept(unused -> asyncResponse.resume(Response.noContent().build())))) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> internalCheckTopicExists(topicName)) + .thenCompose(unused1 -> grantPermissionsAsync(topicName, role, actions)) + .thenAccept(unused -> asyncResponse.resume(Response.noContent().build())) .exceptionally(ex -> { Throwable realCause = FutureUtil.unwrapCompletionException(ex); log.error("[{}] Failed to get permissions for topic {}", clientAppId(), topicName, realCause); @@ -270,6 +272,7 @@ public class PersistentTopicsBase extends AdminResource { protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) + .thenCompose(__ -> internalCheckTopicExists(topicName)) .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, true, false) .thenCompose(metadata -> { int numPartitions = metadata.partitions; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java index e89b4ff5e83..2dcb930fbe7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java @@ -120,6 +120,7 @@ public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest { .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) .authentication(AuthenticationToken.class.getName(), PRODUCE_TOKEN) .build(); + admin.topics().createNonPartitionedTopic(topicName); admin.topics().grantPermission(topicName, "consumer", EnumSet.of(AuthAction.consume)); admin.topics().grantPermission(topicName, "producer", EnumSet.of(AuthAction.produce)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index b28cfc98fdb..635b2c25bc1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -3698,4 +3698,16 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { }); } + + @Test + @SneakyThrows + public void testPermissions() { + String namespace = "prop-xyz/ns1/"; + final String random = UUID.randomUUID().toString(); + final String topic = "persistent://" + namespace + random; + final String subject = UUID.randomUUID().toString(); + assertThrows(NotFoundException.class, () -> admin.topics().getPermissions(topic)); + assertThrows(NotFoundException.class, () -> admin.topics().grantPermission(topic, subject, Set.of(AuthAction.produce))); + assertThrows(NotFoundException.class, () -> admin.topics().revokePermissions(topic, subject)); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index c588051a0fe..55b4c6e1c6f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -890,12 +890,15 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { public void testGrantNonPartitionedTopic() { final String topicName = "non-partitioned-topic"; AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); String role = "role"; Set<AuthAction> expectActions = new HashSet<>(); expectActions.add(AuthAction.produce); response = mock(AsyncResponse.class); - ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); + responseCaptor = ArgumentCaptor.forClass(Response.class); persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); @@ -957,12 +960,15 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { public void testRevokeNonPartitionedTopic() { final String topicName = "non-partitioned-topic"; AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); String role = "role"; Set<AuthAction> expectActions = new HashSet<>(); expectActions.add(AuthAction.produce); response = mock(AsyncResponse.class); - ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); + responseCaptor = ArgumentCaptor.forClass(Response.class); persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions); verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java index f59f9d480b8..6c913d42908 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationTest.java @@ -58,6 +58,7 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { public void setup() throws Exception { conf.setClusterName("c1"); conf.setSystemTopicEnabled(false); + conf.setForceDeleteNamespaceAllowed(true); conf.setAuthenticationEnabled(true); conf.setForceDeleteNamespaceAllowed(true); conf.setForceDeleteTenantAllowed(true); @@ -107,8 +108,9 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); - admin.topics().grantPermission("persistent://p1/c1/ns1/ds2", "other-role", - EnumSet.of(AuthAction.consume)); + String topic = "persistent://p1/c1/ns1/ds2"; + admin.topics().createNonPartitionedTopic(topic); + admin.topics().grantPermission(topic, "other-role", EnumSet.of(AuthAction.consume)); waitForChange(); assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null)); @@ -178,8 +180,9 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "my.role.1", null)); assertFalse(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "my.role.2", null)); - admin.topics().grantPermission("persistent://p1/c1/ns1/ds1", "my.*", - EnumSet.of(AuthAction.produce)); + String topic1 = "persistent://p1/c1/ns1/ds1"; + admin.topics().createNonPartitionedTopic(topic1); + admin.topics().grantPermission(topic1, "my.*", EnumSet.of(AuthAction.produce)); waitForChange(); assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my.role.1", null)); @@ -242,7 +245,7 @@ public class AuthorizationTest extends MockedPulsarServiceBaseTest { assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "role2", null, "role2-sub2")); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "pulsar.super_user", null, "role3-sub1")); - admin.namespaces().deleteNamespace("p1/c1/ns1"); + admin.namespaces().deleteNamespace("p1/c1/ns1", true); admin.tenants().deleteTenant("p1"); admin.clusters().deleteCluster("c1"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java index f9aa17ea3c4..c46f4744cd5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -263,7 +263,9 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase { closeAdmin(); admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).build()); admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test")); - admin.topics().grantPermission("persistent://my-property/my-ns/my-topic", "anonymousUser", + String topic = "persistent://my-property/my-ns/my-topic"; + admin.topics().createNonPartitionedTopic(topic); + admin.topics().grantPermission(topic, "anonymousUser", EnumSet.allOf(AuthAction.class)); // setup the client diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java index 769486054ab..3ead51ad7fc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java @@ -234,6 +234,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase { } // grant topic consume authorization to the subscriptionRole + tenantAdmin.topics().createNonPartitionedTopic(topicName); tenantAdmin.topics().grantPermission(topicName, subscriptionRole, Collections.singleton(AuthAction.consume)); @@ -773,6 +774,7 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase { admin.tenants().createTenant("my-property", new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test")); + admin.topics().createNonPartitionedTopic(topic); admin.topics().grantPermission(topic, invalidRole, Collections.singleton(AuthAction.produce)); admin.topics().grantPermission(topic, producerRole, Sets.newHashSet(AuthAction.produce, AuthAction.consume)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java index d4f7c72bed0..2d00e15a13f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java @@ -55,6 +55,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest { @Override protected void setup() throws Exception { conf.setClusterName(configClusterName); + conf.setForceDeleteNamespaceAllowed(true); internalSetup(); WebSocketProxyConfiguration config = new WebSocketProxyConfiguration(); @@ -99,8 +100,9 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest { assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); - admin.topics().grantPermission("persistent://p1/c1/ns1/ds2", "other-role", - EnumSet.of(AuthAction.consume)); + String topic = "persistent://p1/c1/ns1/ds2"; + admin.topics().createNonPartitionedTopic(topic); + admin.topics().grantPermission(topic, "other-role", EnumSet.of(AuthAction.consume)); waitForChange(); assertTrue(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds2"), "other-role", null)); @@ -117,7 +119,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest { assertTrue(auth.canProduce(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null)); assertTrue(auth.canConsume(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null, null)); - admin.namespaces().deleteNamespace("p1/c1/ns1"); + admin.namespaces().deleteNamespace("p1/c1/ns1", true); admin.tenants().deleteTenant("p1"); admin.clusters().deleteCluster("c1"); }