This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6de711d4008 [improve][test] Add operation authentication test for namespace API (#22398) 6de711d4008 is described below commit 6de711d4008338a875c5c145e856c90dcb041f8f Author: Cong Zhao <zhaoc...@apache.org> AuthorDate: Tue Apr 9 16:38:18 2024 +0800 [improve][test] Add operation authentication test for namespace API (#22398) --- .../pulsar/broker/admin/NamespaceAuthZTest.java | 882 ++++++++++++++++++++- 1 file changed, 875 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java index ce0b925614c..d5a0468f340 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespaceAuthZTest.java @@ -19,23 +19,47 @@ package org.apache.pulsar.broker.admin; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import io.jsonwebtoken.Jwts; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import lombok.Cleanup; import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; +import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; +import org.apache.pulsar.packages.management.core.common.PackageMetadata; import org.apache.pulsar.security.MockedPulsarStandalone; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.UUID; @Test(groups = "broker-admin") public class NamespaceAuthZTest extends MockedPulsarStandalone { @@ -44,17 +68,27 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { private PulsarAdmin tenantManagerAdmin; + private PulsarClient pulsarClient; + + private AuthorizationService authorizationService; + + private AuthorizationService orignalAuthorizationService; + private static final String TENANT_ADMIN_SUBJECT = UUID.randomUUID().toString(); private static final String TENANT_ADMIN_TOKEN = Jwts.builder() .claim("sub", TENANT_ADMIN_SUBJECT).signWith(SECRET_KEY).compact(); @SneakyThrows @BeforeClass - public void before() { + public void setup() { + getServiceConfiguration().setEnablePackagesManagement(true); + getServiceConfiguration().setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName()); + getServiceConfiguration().setDefaultNumberOfNamespaceBundles(1); + getServiceConfiguration().setForceDeleteNamespaceAllowed(true); configureTokenAuthentication(); configureDefaultAuthorization(); start(); - this.superUserAdmin =PulsarAdmin.builder() + this.superUserAdmin = PulsarAdmin.builder() .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) .build(); @@ -65,12 +99,13 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { .serviceHttpUrl(getPulsarService().getWebServiceAddress()) .authentication(new AuthenticationToken(TENANT_ADMIN_TOKEN)) .build(); + this.pulsarClient = super.getPulsarService().getClient(); } @SneakyThrows @AfterClass - public void after() { + public void cleanup() { if (superUserAdmin != null) { superUserAdmin.close(); } @@ -80,6 +115,33 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { close(); } + @BeforeMethod + public void before() throws IllegalAccessException { + orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); + authorizationService = Mockito.spy(orignalAuthorizationService); + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", + authorizationService, true); + } + + @AfterMethod + public void after() throws IllegalAccessException, PulsarAdminException { + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", + orignalAuthorizationService, true); + superUserAdmin.namespaces().deleteNamespace("public/default", true); + superUserAdmin.namespaces().createNamespace("public/default"); + } + + private void setAuthorizationOperationChecker(String role, NamespaceOperation operation) { + Mockito.doAnswer(invocationOnMock -> { + String role_ = invocationOnMock.getArgument(2); + if (role.equals(role_)) { + NamespaceOperation operation_ = invocationOnMock.getArgument(1); + Assert.assertEquals(operation_, operation); + } + return invocationOnMock.callRealMethod(); + }).when(authorizationService).allowNamespaceOperationAsync(Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.any()); + } @SneakyThrows @Test @@ -160,4 +222,810 @@ public class NamespaceAuthZTest extends MockedPulsarStandalone { } superUserAdmin.topics().delete(topic, true); } + + @Test + public void testTopics() throws Exception { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://" + namespace + "/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + // test super admin + superUserAdmin.namespaces().getTopics(namespace); + + // test tenant manager + tenantManagerAdmin.namespaces().getTopics(namespace); + + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getTopics(namespace)); + + setAuthorizationOperationChecker(subject, NamespaceOperation.GET_TOPICS); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + if (AuthAction.consume == action || AuthAction.produce == action) { + subAdmin.namespaces().getTopics(namespace); + } else { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getTopics(namespace)); + } + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + + superUserAdmin.topics().delete(topic, true); + } + + @Test + public void testBookieAffinityGroup() throws Exception { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://" + namespace + "/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + // test super admin + BookieAffinityGroupData bookieAffinityGroupData = BookieAffinityGroupData.builder() + .bookkeeperAffinityGroupPrimary("aaa") + .bookkeeperAffinityGroupSecondary("bbb") + .build(); + superUserAdmin.namespaces().setBookieAffinityGroup(namespace, bookieAffinityGroupData); + BookieAffinityGroupData bookieAffinityGroup = superUserAdmin.namespaces().getBookieAffinityGroup(namespace); + Assert.assertEquals(bookieAffinityGroupData, bookieAffinityGroup); + superUserAdmin.namespaces().deleteBookieAffinityGroup(namespace); + bookieAffinityGroup = superUserAdmin.namespaces().getBookieAffinityGroup(namespace); + Assert.assertNull(bookieAffinityGroup); + + // test tenant manager + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> tenantManagerAdmin.namespaces().setBookieAffinityGroup(namespace, bookieAffinityGroupData)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> tenantManagerAdmin.namespaces().getBookieAffinityGroup(namespace)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> tenantManagerAdmin.namespaces().deleteBookieAffinityGroup(namespace)); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setBookieAffinityGroup(namespace, bookieAffinityGroupData)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getBookieAffinityGroup(namespace)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().deleteBookieAffinityGroup(namespace)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().setBookieAffinityGroup(namespace, bookieAffinityGroupData)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getBookieAffinityGroup(namespace)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().deleteBookieAffinityGroup(namespace)); + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + + superUserAdmin.topics().delete(topic, true); + } + + + @Test + public void testGetBundles() throws Exception { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://" + namespace + "/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producer.send("message".getBytes()); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // test super admin + superUserAdmin.namespaces().getBundles(namespace); + + // test tenant manager + tenantManagerAdmin.namespaces().getBundles(namespace); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getBundles(namespace)); + + setAuthorizationOperationChecker(subject, NamespaceOperation.GET_BUNDLE); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + if (AuthAction.consume == action || AuthAction.produce == action) { + subAdmin.namespaces().getBundles(namespace); + } else { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getBundles(namespace)); + } + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + + superUserAdmin.topics().delete(topic, true); + } + + @Test + public void testUnloadBundles() throws Exception { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://" + namespace + "/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producer.send("message".getBytes()); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + final String defaultBundle = "0x00000000_0xffffffff"; + + // test super admin + superUserAdmin.namespaces().unloadNamespaceBundle(namespace, defaultBundle); + + // test tenant manager + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> tenantManagerAdmin.namespaces().unloadNamespaceBundle(namespace, defaultBundle)); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().unloadNamespaceBundle(namespace, defaultBundle)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().unloadNamespaceBundle(namespace, defaultBundle)); + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + + superUserAdmin.topics().delete(topic, true); + } + + @Test + public void testSplitBundles() throws Exception { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://" + namespace + "/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producer.send("message".getBytes()); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + final String defaultBundle = "0x00000000_0xffffffff"; + + // test super admin + superUserAdmin.namespaces().splitNamespaceBundle(namespace, defaultBundle, false, null); + + // test tenant manager + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> tenantManagerAdmin.namespaces().splitNamespaceBundle(namespace, defaultBundle, false, null)); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().splitNamespaceBundle(namespace, defaultBundle, false, null)); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().splitNamespaceBundle(namespace, defaultBundle, false, null)); + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + + superUserAdmin.topics().delete(topic, true); + } + + @Test + public void testDeleteBundles() throws Exception { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://" + namespace + "/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + producer.send("message".getBytes()); + + for (int i = 0; i < 3; i++) { + superUserAdmin.namespaces().splitNamespaceBundle(namespace, Policies.BundleType.LARGEST.toString(), false, null); + } + + BundlesData bundles = superUserAdmin.namespaces().getBundles(namespace); + Assert.assertEquals(bundles.getNumBundles(), 4); + List<String> boundaries = bundles.getBoundaries(); + Assert.assertEquals(boundaries.size(), 5); + + List<String> bundleRanges = new ArrayList<>(); + for (int i = 0; i < boundaries.size() - 1; i++) { + String bundleRange = boundaries.get(i) + "_" + boundaries.get(i + 1); + List<Topic> allTopicsFromNamespaceBundle = getPulsarService().getBrokerService() + .getAllTopicsFromNamespaceBundle(namespace, namespace + "/" + bundleRange); + System.out.println(StringUtils.join(allTopicsFromNamespaceBundle)); + if (allTopicsFromNamespaceBundle.isEmpty()) { + bundleRanges.add(bundleRange); + } + } + + // test super admin + superUserAdmin.namespaces().deleteNamespaceBundle(namespace, bundleRanges.get(0)); + + // test tenant manager + tenantManagerAdmin.namespaces().deleteNamespaceBundle(namespace, bundleRanges.get(1)); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().deleteNamespaceBundle(namespace, bundleRanges.get(1))); + + setAuthorizationOperationChecker(subject, NamespaceOperation.DELETE_BUNDLE); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().deleteNamespaceBundle(namespace, bundleRanges.get(1))); + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + } + + @Test + public void testPermission() throws Exception { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://" + namespace + "/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + final String role = "sub"; + final AuthAction testAction = AuthAction.consume; + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + + // test super admin + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, role, Set.of(testAction)); + Map<String, Set<AuthAction>> permissions = superUserAdmin.namespaces().getPermissions(namespace); + Assert.assertEquals(permissions.get(role), Set.of(testAction)); + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, role); + permissions = superUserAdmin.namespaces().getPermissions(namespace); + Assert.assertTrue(permissions.isEmpty()); + + // test tenant manager + tenantManagerAdmin.namespaces().grantPermissionOnNamespace(namespace, role, Set.of(testAction)); + permissions = tenantManagerAdmin.namespaces().getPermissions(namespace); + Assert.assertEquals(permissions.get(role), Set.of(testAction)); + tenantManagerAdmin.namespaces().revokePermissionsOnNamespace(namespace, role); + permissions = tenantManagerAdmin.namespaces().getPermissions(namespace); + Assert.assertTrue(permissions.isEmpty()); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().grantPermissionOnNamespace(namespace, role, Set.of(testAction))); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getPermissions(namespace)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().revokePermissionsOnNamespace(namespace, role)); + + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + setAuthorizationOperationChecker(subject, NamespaceOperation.GRANT_PERMISSION); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().grantPermissionOnNamespace(namespace, role, Set.of(testAction))); + setAuthorizationOperationChecker(subject, NamespaceOperation.GET_PERMISSION); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getPermissions(namespace)); + setAuthorizationOperationChecker(subject, NamespaceOperation.REVOKE_PERMISSION); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().revokePermissionsOnNamespace(namespace, role)); + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + + superUserAdmin.topics().delete(topic, true); + } + + @Test + public void testPermissionOnSubscription() throws Exception { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://" + namespace + "/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + final String subscription = "my-sub"; + final String role = "sub"; + pulsarClient.newConsumer().topic(topic) + .subscriptionName(subscription) + .subscribe().close(); + + + // test super admin + superUserAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription, Set.of(role)); + Map<String, Set<String>> permissionOnSubscription = superUserAdmin.namespaces().getPermissionOnSubscription(namespace); + Assert.assertEquals(permissionOnSubscription.get(subscription), Set.of(role)); + superUserAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription, role); + permissionOnSubscription = superUserAdmin.namespaces().getPermissionOnSubscription(namespace); + Assert.assertTrue(permissionOnSubscription.isEmpty()); + + // test tenant manager + tenantManagerAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription, Set.of(role)); + permissionOnSubscription = tenantManagerAdmin.namespaces().getPermissionOnSubscription(namespace); + Assert.assertEquals(permissionOnSubscription.get(subscription), Set.of(role)); + tenantManagerAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription, role); + permissionOnSubscription = tenantManagerAdmin.namespaces().getPermissionOnSubscription(namespace); + Assert.assertTrue(permissionOnSubscription.isEmpty()); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription, Set.of(role))); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getPermissionOnSubscription(namespace)); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription, role)); + + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + setAuthorizationOperationChecker(subject, NamespaceOperation.GRANT_PERMISSION); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().grantPermissionOnSubscription(namespace, subscription, Set.of(role))); + setAuthorizationOperationChecker(subject, NamespaceOperation.GET_PERMISSION); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().getPermissionOnSubscription(namespace)); + setAuthorizationOperationChecker(subject, NamespaceOperation.REVOKE_PERMISSION); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().revokePermissionOnSubscription(namespace, subscription, role)); + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + + superUserAdmin.topics().delete(topic, true); + } + + @Test + public void testClearBacklog() throws Exception { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://" + namespace + "/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + // test super admin + superUserAdmin.namespaces().clearNamespaceBacklog(namespace); + + // test tenant manager + tenantManagerAdmin.namespaces().clearNamespaceBacklog(namespace); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().clearNamespaceBacklog(namespace)); + + setAuthorizationOperationChecker(subject, NamespaceOperation.CLEAR_BACKLOG); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + if (AuthAction.consume == action) { + subAdmin.namespaces().clearNamespaceBacklog(namespace); + } else { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().clearNamespaceBacklog(namespace)); + } + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + + superUserAdmin.topics().delete(topic, true); + } + + @Test + public void testClearNamespaceBundleBacklog() throws Exception { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://" + namespace + "/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + @Cleanup + Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .create(); + + final String defaultBundle = "0x00000000_0xffffffff"; + + // test super admin + superUserAdmin.namespaces().clearNamespaceBundleBacklog(namespace, defaultBundle); + + // test tenant manager + tenantManagerAdmin.namespaces().clearNamespaceBundleBacklog(namespace, defaultBundle); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().clearNamespaceBundleBacklog(namespace, defaultBundle)); + + setAuthorizationOperationChecker(subject, NamespaceOperation.CLEAR_BACKLOG); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + if (AuthAction.consume == action) { + subAdmin.namespaces().clearNamespaceBundleBacklog(namespace, defaultBundle); + } else { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().clearNamespaceBundleBacklog(namespace, defaultBundle)); + } + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + + superUserAdmin.topics().delete(topic, true); + } + + @Test + public void testUnsubscribeNamespace() throws Exception { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://" + namespace + "/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + @Cleanup + Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .create(); + + pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub") + .subscribe().close(); + + // test super admin + superUserAdmin.namespaces().unsubscribeNamespace(namespace, "sub"); + + // test tenant manager + tenantManagerAdmin.namespaces().unsubscribeNamespace(namespace, "sub"); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().unsubscribeNamespace(namespace, "sub")); + + setAuthorizationOperationChecker(subject, NamespaceOperation.UNSUBSCRIBE); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + if (AuthAction.consume == action) { + subAdmin.namespaces().unsubscribeNamespace(namespace, "sub"); + } else { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().unsubscribeNamespace(namespace, "sub")); + } + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + + superUserAdmin.topics().delete(topic, true); + } + + @Test + public void testUnsubscribeNamespaceBundle() throws Exception { + final String random = UUID.randomUUID().toString(); + final String namespace = "public/default"; + final String topic = "persistent://" + namespace + "/" + random; + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + superUserAdmin.topics().createNonPartitionedTopic(topic); + @Cleanup + final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + @Cleanup + Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .create(); + + pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub") + .subscribe().close(); + + final String defaultBundle = "0x00000000_0xffffffff"; + + // test super admin + superUserAdmin.namespaces().unsubscribeNamespaceBundle(namespace, defaultBundle, "sub"); + + // test tenant manager + tenantManagerAdmin.namespaces().unsubscribeNamespaceBundle(namespace, defaultBundle, "sub"); + + // test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().unsubscribeNamespaceBundle(namespace, defaultBundle, "sub")); + + setAuthorizationOperationChecker(subject, NamespaceOperation.UNSUBSCRIBE); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + if (AuthAction.consume == action) { + subAdmin.namespaces().unsubscribeNamespaceBundle(namespace, defaultBundle, "sub"); + } else { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.namespaces().unsubscribeNamespaceBundle(namespace, defaultBundle, "sub")); + } + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + + superUserAdmin.topics().delete(topic, true); + } + + @Test + public void testPackageAPI() throws Exception { + final String namespace = "public/default"; + + final String subject = UUID.randomUUID().toString(); + final String token = Jwts.builder() + .claim("sub", subject).signWith(SECRET_KEY).compact(); + @Cleanup final PulsarAdmin subAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(token)) + .build(); + + + File file = File.createTempFile("package-api-test", ".package"); + + // testing upload api + String packageName = "function://public/default/test@v1"; + PackageMetadata originalMetadata = PackageMetadata.builder().description("test").build(); + superUserAdmin.packages().upload(originalMetadata, packageName, file.getPath()); + + // testing download api + String downloadPath = new File(file.getParentFile(), "package-api-test-download.package").getPath(); + superUserAdmin.packages().download(packageName, downloadPath); + File downloadFile = new File(downloadPath); + assertTrue(downloadFile.exists()); + downloadFile.delete(); + + // testing list packages api + List<String> packages = superUserAdmin.packages().listPackages("function", "public/default"); + assertEquals(packages.size(), 1); + assertEquals(packages.get(0), "test"); + + // testing list versions api + List<String> versions = superUserAdmin.packages().listPackageVersions(packageName); + assertEquals(versions.size(), 1); + assertEquals(versions.get(0), "v1"); + + // testing get packages api + PackageMetadata metadata = superUserAdmin.packages().getMetadata(packageName); + assertEquals(metadata.getDescription(), originalMetadata.getDescription()); + assertNull(metadata.getContact()); + assertTrue(metadata.getModificationTime() > 0); + assertTrue(metadata.getCreateTime() > 0); + assertNull(metadata.getProperties()); + + // testing update package metadata api + PackageMetadata updatedMetadata = originalMetadata; + updatedMetadata.setContact("t...@apache.org"); + updatedMetadata.setProperties(Collections.singletonMap("key", "value")); + superUserAdmin.packages().updateMetadata(packageName, updatedMetadata); + + superUserAdmin.packages().getMetadata(packageName); + + // ---- test tenant manager --- + + file = File.createTempFile("package-api-test", ".package"); + + // test tenant manager + packageName = "function://public/default/test@v2"; + originalMetadata = PackageMetadata.builder().description("test").build(); + tenantManagerAdmin.packages().upload(originalMetadata, packageName, file.getPath()); + + // testing download api + downloadPath = new File(file.getParentFile(), "package-api-test-download.package").getPath(); + tenantManagerAdmin.packages().download(packageName, downloadPath); + downloadFile = new File(downloadPath); + assertTrue(downloadFile.exists()); + downloadFile.delete(); + + // testing list packages api + packages = tenantManagerAdmin.packages().listPackages("function", "public/default"); + assertEquals(packages.size(), 1); + assertEquals(packages.get(0), "test"); + + // testing list versions api + tenantManagerAdmin.packages().listPackageVersions(packageName); + + // testing get packages api + tenantManagerAdmin.packages().getMetadata(packageName); + + // testing update package metadata api + updatedMetadata = originalMetadata; + updatedMetadata.setContact("t...@apache.org"); + updatedMetadata.setProperties(Collections.singletonMap("key", "value")); + tenantManagerAdmin.packages().updateMetadata(packageName, updatedMetadata); + + // ---- test nobody --- + + File file3 = File.createTempFile("package-api-test", ".package"); + + // test tenant manager + String packageName3 = "function://public/default/test@v3"; + PackageMetadata originalMetadata3 = PackageMetadata.builder().description("test").build(); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.packages().upload(originalMetadata3, packageName3, file3.getPath())); + + + // testing download api + String downloadPath3 = new File(file3.getParentFile(), "package-api-test-download.package").getPath(); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.packages().download(packageName3, downloadPath3)); + + // testing list packages api + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.packages().listPackages("function", "public/default")); + + // testing list versions api + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.packages().listPackageVersions(packageName3)); + + // testing get packages api + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.packages().getMetadata(packageName3)); + + // testing update package metadata api + PackageMetadata updatedMetadata3 = originalMetadata; + updatedMetadata3.setContact("t...@apache.org"); + updatedMetadata3.setProperties(Collections.singletonMap("key", "value")); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.packages().updateMetadata(packageName3, updatedMetadata3)); + + + setAuthorizationOperationChecker(subject, NamespaceOperation.PACKAGES); + + for (AuthAction action : AuthAction.values()) { + superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action)); + File file4 = File.createTempFile("package-api-test", ".package"); + String packageName4 = "function://public/default/test@v4"; + PackageMetadata originalMetadata4 = PackageMetadata.builder().description("test").build(); + String downloadPath4 = new File(file3.getParentFile(), "package-api-test-download.package").getPath(); + if (AuthAction.packages == action) { + subAdmin.packages().upload(originalMetadata4, packageName4, file.getPath()); + + // testing download api + subAdmin.packages().download(packageName4, downloadPath4); + downloadFile = new File(downloadPath4); + assertTrue(downloadFile.exists()); + downloadFile.delete(); + + // testing list packages api + packages = subAdmin.packages().listPackages("function", "public/default"); + assertEquals(packages.size(), 1); + assertEquals(packages.get(0), "test"); + + // testing list versions api + subAdmin.packages().listPackageVersions(packageName4); + + // testing get packages api + subAdmin.packages().getMetadata(packageName4); + + // testing update package metadata api + PackageMetadata updatedMetadata4 = originalMetadata; + updatedMetadata4.setContact("t...@apache.org"); + updatedMetadata4.setProperties(Collections.singletonMap("key", "value")); + subAdmin.packages().updateMetadata(packageName, updatedMetadata4); + } else { + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.packages().upload(originalMetadata4, packageName4, file4.getPath())); + + // testing download api + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.packages().download(packageName4, downloadPath4)); + + // testing list packages api + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.packages().listPackages("function", "public/default")); + + // testing list versions api + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.packages().listPackageVersions(packageName4)); + + // testing get packages api + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.packages().getMetadata(packageName4)); + + // testing update package metadata api + PackageMetadata updatedMetadata4 = originalMetadata; + updatedMetadata4.setContact("t...@apache.org"); + updatedMetadata4.setProperties(Collections.singletonMap("key", "value")); + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> subAdmin.packages().updateMetadata(packageName4, updatedMetadata4)); + } + superUserAdmin.namespaces().revokePermissionsOnNamespace(namespace, subject); + } + } }