This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fc67fe2  Introduce admin api to get broker and namespace-isolation 
policy map (#1565)
fc67fe2 is described below

commit fc67fe2c98eeb7c47023ff2dd29144972a4a693f
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Fri Apr 13 13:17:20 2018 -0700

    Introduce admin api to get broker and namespace-isolation policy map (#1565)
    
    * Introduce admin api to get broker and namespace-isolation policy map
    
    * add missed commit
---
 .../pulsar/broker/admin/impl/BrokersBase.java      |   2 +-
 .../pulsar/broker/admin/impl/ClustersBase.java     | 113 ++++++++++++++++++++-
 .../pulsar/broker/loadbalance/LoadManager.java     |  10 ++
 .../broker/loadbalance/ModularLoadManager.java     |   8 ++
 .../loadbalance/impl/ModularLoadManagerImpl.java   |   1 +
 .../impl/ModularLoadManagerWrapper.java            |   6 ++
 .../loadbalance/impl/SimpleLoadManagerImpl.java    |   5 +
 .../impl/SimpleResourceAllocationPolicies.java     |   3 +-
 .../pulsar/broker/namespace/NamespaceService.java  |   4 +-
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  50 +++++++++
 .../org/apache/pulsar/client/admin/Clusters.java   |  23 +++++
 .../pulsar/client/admin/internal/ClustersImpl.java |  25 +++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  21 +++-
 .../admin/cli/CmdNamespaceIsolationPolicy.java     |  36 +++++++
 .../data/BrokerNamespaceIsolationData.java         |  39 +++++++
 15 files changed, 335 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 60329d7..59027c2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -95,7 +95,7 @@ public class BrokersBase extends AdminResource {
             throw new RestException(e);
         }
     }
-    
+
     @POST
     @Path("/configuration/{configName}/{configValue}")
     @ApiOperation(value = "Update dynamic serviceconfiguration into zk only. 
This operation requires Pulsar super-user privileges.")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index de670c1..c05ea77 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -40,12 +40,15 @@ import javax.ws.rs.core.Response.Status;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import static 
org.apache.pulsar.broker.namespace.NamespaceService.NAMESPACE_ISOLATION_POLICIES;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.NamedEntity;
+import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
+import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -56,6 +59,7 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.JsonGenerationException;
 import com.fasterxml.jackson.databind.JsonMappingException;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import io.swagger.annotations.ApiOperation;
@@ -262,7 +266,7 @@ public class ClustersBase extends AdminResource {
             }
 
             // check the namespaceIsolationPolicies associated with the cluster
-            String path = path("clusters", cluster, 
"namespaceIsolationPolicies");
+            String path = path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES);
             Optional<NamespaceIsolationPolicies> nsIsolationPolicies = 
namespaceIsolationPoliciesCache().get(path);
 
             // Need to delete the isolation policies if present
@@ -332,7 +336,7 @@ public class ClustersBase extends AdminResource {
 
         try {
             NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPoliciesCache()
-                    .get(path("clusters", cluster, 
"namespaceIsolationPolicies"))
+                    .get(path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND,
                             "NamespaceIsolationPolicies for cluster " + 
cluster + " does not exist"));
             // construct the response to NamespaceisolationData map
@@ -356,7 +360,7 @@ public class ClustersBase extends AdminResource {
 
         try {
             NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPoliciesCache()
-                    .get(path("clusters", cluster, 
"namespaceIsolationPolicies"))
+                    .get(path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES))
                     .orElseThrow(() -> new RestException(Status.NOT_FOUND,
                             "NamespaceIsolationPolicies for cluster " + 
cluster + " does not exist"));
             // construct the response to NamespaceisolationData map
@@ -374,6 +378,105 @@ public class ClustersBase extends AdminResource {
         }
     }
 
+    @GET
+    @Path("/{cluster}/namespaceIsolationPolicies/brokers")
+    @ApiOperation(value = "Get list of brokers with namespace-isolation 
policies attached to them", response = BrokerNamespaceIsolationData.class)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Namespace-isolation policies 
not found"),
+            @ApiResponse(code = 412, message = "Cluster doesn't exist") })
+    public List<BrokerNamespaceIsolationData> 
getBrokersWithNamespaceIsolationPolicy(
+            @PathParam("cluster") String cluster) {
+        validateSuperUserAccess();
+        validateClusterExists(cluster);
+
+        Set<String> availableBrokers;
+        final String nsIsolationPoliciesPath = AdminResource.path("clusters", 
cluster, NAMESPACE_ISOLATION_POLICIES);
+        Map<String, NamespaceIsolationData> nsPolicies;
+        try {
+            availableBrokers = 
pulsar().getLoadManager().get().getAvailableBrokers();
+        } catch (Exception e) {
+            log.error("[{}] Failed to get list of brokers in cluster {}", 
clientAppId(), cluster, e);
+            throw new RestException(e);
+        }
+        try {
+            Optional<NamespaceIsolationPolicies> nsPoliciesResult = 
namespaceIsolationPoliciesCache()
+                    .get(nsIsolationPoliciesPath);
+            if (!nsPoliciesResult.isPresent()) {
+                throw new RestException(Status.NOT_FOUND, "namespace-isolation 
policies not found for " + cluster);
+            }
+            nsPolicies = nsPoliciesResult.get().getPolicies();
+        } catch (Exception e) {
+            log.error("[{}] Failed to get namespace isolation-policies {}", 
clientAppId(), cluster, e);
+            throw new RestException(e);
+        }
+        return availableBrokers.stream().map(broker -> {
+            BrokerNamespaceIsolationData brokerIsolationData = new 
BrokerNamespaceIsolationData();
+            brokerIsolationData.brokerName = broker;
+            if (nsPolicies != null) {
+                nsPolicies.forEach((name, policyData) -> {
+                    NamespaceIsolationPolicyImpl nsPolicyImpl = new 
NamespaceIsolationPolicyImpl(policyData);
+                    if (nsPolicyImpl.isPrimaryBroker(broker) || 
nsPolicyImpl.isSecondaryBroker(broker)) {
+                        if (brokerIsolationData.namespaceRegex == null) {
+                            brokerIsolationData.namespaceRegex = 
Lists.newArrayList();
+                        }
+                        
brokerIsolationData.namespaceRegex.addAll(policyData.namespaces);
+                    }
+                });
+            }
+            return brokerIsolationData;
+        }).collect(Collectors.toList());
+    }
+
+    @GET
+    @Path("/{cluster}/namespaceIsolationPolicies/brokers/{broker}")
+    @ApiOperation(value = "Get a broker with namespace-isolation policies 
attached to it", response = BrokerNamespaceIsolationData.class)
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Namespace-isolation policies/ 
Broker not found"),
+            @ApiResponse(code = 412, message = "Cluster doesn't exist") })
+    public BrokerNamespaceIsolationData 
getBrokerWithNamespaceIsolationPolicy(@PathParam("cluster") String cluster,
+            @PathParam("broker") String broker) {
+        validateSuperUserAccess();
+        validateClusterExists(cluster);
+
+        Set<String> availableBrokers;
+        final String nsIsolationPoliciesPath = AdminResource.path("clusters", 
cluster, NAMESPACE_ISOLATION_POLICIES);
+        Map<String, NamespaceIsolationData> nsPolicies;
+        try {
+            availableBrokers = 
pulsar().getLoadManager().get().getAvailableBrokers();
+        } catch (Exception e) {
+            log.error("[{}] Failed to get list of brokers in cluster {}", 
clientAppId(), cluster, e);
+            throw new RestException(e);
+        }
+        if (availableBrokers == null || !availableBrokers.contains(broker)) {
+            throw new RestException(Status.NOT_FOUND, "Broker is not part of 
active broker list " + broker);
+        }
+        try {
+            Optional<NamespaceIsolationPolicies> nsPoliciesResult = 
namespaceIsolationPoliciesCache()
+                    .get(nsIsolationPoliciesPath);
+            if (!nsPoliciesResult.isPresent()) {
+                throw new RestException(Status.NOT_FOUND, "namespace-isolation 
policies not found for " + cluster);
+            }
+            nsPolicies = nsPoliciesResult.get().getPolicies();
+        } catch (Exception e) {
+            log.error("[{}] Failed to get namespace isolation-policies {}", 
clientAppId(), cluster, e);
+            throw new RestException(e);
+        }
+        BrokerNamespaceIsolationData brokerIsolationData = new 
BrokerNamespaceIsolationData();
+        brokerIsolationData.brokerName = broker;
+        if (nsPolicies != null) {
+            nsPolicies.forEach((name, policyData) -> {
+                NamespaceIsolationPolicyImpl nsPolicyImpl = new 
NamespaceIsolationPolicyImpl(policyData);
+                if (nsPolicyImpl.isPrimaryBroker(broker) || 
nsPolicyImpl.isSecondaryBroker(broker)) {
+                    if (brokerIsolationData.namespaceRegex == null) {
+                        brokerIsolationData.namespaceRegex = 
Lists.newArrayList();
+                    }
+                    
brokerIsolationData.namespaceRegex.addAll(policyData.namespaces);
+                }
+            });
+        }
+        return brokerIsolationData;
+    }
+    
     @POST
     @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
     @ApiOperation(value = "Set namespace isolation policy")
@@ -389,7 +492,7 @@ public class ClustersBase extends AdminResource {
             // validate the policy data before creating the node
             policyData.validate();
 
-            String nsIsolationPolicyPath = path("clusters", cluster, 
"namespaceIsolationPolicies");
+            String nsIsolationPolicyPath = path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES);
             NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPoliciesCache()
                     .get(nsIsolationPolicyPath).orElseGet(() -> {
                         try {
@@ -458,7 +561,7 @@ public class ClustersBase extends AdminResource {
 
         try {
 
-            String nsIsolationPolicyPath = path("clusters", cluster, 
"namespaceIsolationPolicies");
+            String nsIsolationPolicyPath = path("clusters", cluster, 
NAMESPACE_ISOLATION_POLICIES);
             NamespaceIsolationPolicies nsIsolationPolicies = 
namespaceIsolationPoliciesCache()
                     .get(nsIsolationPolicyPath).orElseGet(() -> {
                         try {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
index 3a7409e..6a9dc38 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance;
 
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -107,6 +108,14 @@ public interface LoadManager {
      * @throws Exception
      */
     public void disableBroker() throws Exception;
+    
+    /**
+     * Get list of available brokers in cluster
+     * 
+     * @return
+     * @throws Exception 
+     */
+    Set<String> getAvailableBrokers() throws Exception;
 
     public void stop() throws PulsarServerException;
 
@@ -139,4 +148,5 @@ public interface LoadManager {
         // If we failed to create a load manager, default to 
SimpleLoadManagerImpl.
         return new SimpleLoadManagerImpl(pulsar);
     }
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
index 9d5603d..369bf5a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManager.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.loadbalance;
 
 import java.util.Optional;
+import java.util.Set;
 
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -103,4 +104,11 @@ public interface ModularLoadManager {
      * @return
      */
     Deserializer<? extends ServiceLookupData> getLoadReportDeserializer();
+
+    /**
+     * Get available broker list in cluster
+     * 
+     * @return
+     */
+    Set<String> getAvailableBrokers();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 23f651e..714389f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -335,6 +335,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, ZooKeeperCach
         }
     }
 
+    @Override
     public Set<String> getAvailableBrokers() {
         try {
             return availableActiveBrokers.get();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
index 54b9d56..82d99b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerWrapper.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.loadbalance.impl;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
@@ -122,4 +123,9 @@ public class ModularLoadManagerWrapper implements 
LoadManager {
     public ModularLoadManager getLoadManager() {
         return loadManager;
     }
+
+    @Override
+    public Set<String> getAvailableBrokers() throws Exception {
+        return loadManager.getAvailableBrokers();
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index bbaf8bb..da4c0e3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -351,6 +351,11 @@ public class SimpleLoadManagerImpl implements LoadManager, 
ZooKeeperCacheListene
         return this.availableActiveBrokers;
     }
 
+    @Override
+    public Set<String> getAvailableBrokers() throws Exception {
+        return this.availableActiveBrokers.get();
+    }
+    
     public ZooKeeperDataCache<LoadReport> getLoadReportCache() {
         return this.loadReportCacheZk;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java
index fa67c0d..2c1aa99 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleResourceAllocationPolicies.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.loadbalance.LoadReport;
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.broker.loadbalance.ServiceUnit;
+import static 
org.apache.pulsar.broker.namespace.NamespaceService.NAMESPACE_ISOLATION_POLICIES;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
@@ -51,7 +52,7 @@ public class SimpleResourceAllocationPolicies {
     private Optional<NamespaceIsolationPolicies> getIsolationPolicies(String 
clusterName) {
         try {
             return namespaceIsolationPolicies
-                    .get(AdminResource.path("clusters", clusterName, 
"namespaceIsolationPolicies"));
+                    .get(AdminResource.path("clusters", clusterName, 
NAMESPACE_ISOLATION_POLICIES));
         } catch (Exception e) {
             LOG.warn("GetIsolationPolicies: Unable to get the 
namespaceIsolationPolicies [{}]", e);
             return Optional.empty();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 2668f95..a712aec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -119,6 +119,8 @@ public class NamespaceService {
     public static final Pattern SLA_NAMESPACE_PATTERN = 
Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
     public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s";
     public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + 
"/%s/%s:%s";
+    
+    public static final String NAMESPACE_ISOLATION_POLICIES = 
"namespaceIsolationPolicies";
 
     /**
      * Default constructor.
@@ -519,7 +521,7 @@ public class NamespaceService {
     private NamespaceIsolationPolicies getLocalNamespaceIsolationPolicies() 
throws Exception {
         String localCluster = pulsar.getConfiguration().getClusterName();
         return pulsar.getConfigurationCache().namespaceIsolationPoliciesCache()
-                .get(AdminResource.path("clusters", localCluster, 
"namespaceIsolationPolicies")).orElseGet(() -> {
+                .get(AdminResource.path("clusters", localCluster, 
NAMESPACE_ISOLATION_POLICIES)).orElseGet(() -> {
                     // the namespace isolation policies are empty/undefined = 
an empty object
                     return new NamespaceIsolationPolicies();
                 });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index c8c3b75..7e4262f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -27,6 +27,8 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -54,9 +56,13 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
+import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.FailureDomain;
+import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -65,6 +71,7 @@ import 
org.apache.pulsar.common.policies.data.PersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -805,4 +812,47 @@ public class AdminApiTest2 extends 
MockedPulsarServiceBaseTest {
             // Expected
         }
     }
+    
+    @Test
+    public void brokerNamespaceIsolationPolicies() throws Exception {
+
+        // create
+        String policyName1 = "policy-1";
+        String namespaceRegex = "other/use/other.*";
+        String cluster = "use";
+        String brokerName = pulsar.getAdvertisedAddress();
+        String brokerAddress = brokerName + ":" + 
pulsar.getConfiguration().getWebServicePort();
+        NamespaceIsolationData nsPolicyData1 = new NamespaceIsolationData();
+        nsPolicyData1.namespaces = new ArrayList<String>();
+        nsPolicyData1.namespaces.add(namespaceRegex);
+        nsPolicyData1.primary = new ArrayList<String>();
+        nsPolicyData1.primary.add(brokerName + ":[0-9]*");
+        nsPolicyData1.secondary = new ArrayList<String>();
+        nsPolicyData1.secondary.add(brokerName + ".*");
+        nsPolicyData1.auto_failover_policy = new AutoFailoverPolicyData();
+        nsPolicyData1.auto_failover_policy.policy_type = 
AutoFailoverPolicyType.min_available;
+        nsPolicyData1.auto_failover_policy.parameters = new HashMap<String, 
String>();
+        nsPolicyData1.auto_failover_policy.parameters.put("min_limit", "1");
+        nsPolicyData1.auto_failover_policy.parameters.put("usage_threshold", 
"100");
+        admin.clusters().createNamespaceIsolationPolicy(cluster, policyName1, 
nsPolicyData1);
+
+        List<BrokerNamespaceIsolationData> brokerIsolationDataList = 
admin.clusters()
+                .getBrokersWithNamespaceIsolationPolicy(cluster);
+        assertEquals(brokerIsolationDataList.size(), 1);
+        assertEquals(brokerIsolationDataList.get(0).brokerName, brokerAddress);
+        assertEquals(brokerIsolationDataList.get(0).namespaceRegex.size(), 1);
+        assertEquals(brokerIsolationDataList.get(0).namespaceRegex.get(0), 
namespaceRegex);
+
+        BrokerNamespaceIsolationData brokerIsolationData = admin.clusters()
+                .getBrokerWithNamespaceIsolationPolicy(cluster, brokerAddress);
+        assertEquals(brokerIsolationData.brokerName, brokerAddress);
+        assertEquals(brokerIsolationData.namespaceRegex.size(), 1);
+        assertEquals(brokerIsolationData.namespaceRegex.get(0), 
namespaceRegex);
+
+        try {
+            admin.clusters().getBrokerWithNamespaceIsolationPolicy(cluster, 
"invalid-broker");
+            Assert.fail("should have failed due to invalid broker address");
+        } catch (PulsarAdminException.NotFoundException e) {// expected
+        }
+    }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
index 4048367..54c9c86 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Clusters.java
@@ -27,6 +27,7 @@ import 
org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
+import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
@@ -227,6 +228,28 @@ public interface Clusters {
     void createNamespaceIsolationPolicy(String cluster, String policyName, 
NamespaceIsolationData namespaceIsolationData)
             throws PulsarAdminException;
 
+    
+    /**
+     * Returns list of active brokers with namespace-isolation policies 
attached to it.
+     * 
+     * @param cluster
+     * @return
+     * @throws PulsarAdminException
+     */
+    List<BrokerNamespaceIsolationData> 
getBrokersWithNamespaceIsolationPolicy(String cluster)
+            throws PulsarAdminException;
+
+    /**
+     * Returns active broker with namespace-isolation policies attached to it.
+     * 
+     * @param cluster
+     * @param broker
+     * @return
+     * @throws PulsarAdminException
+     */
+    BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(String 
cluster, String broker)
+            throws PulsarAdminException;
+    
 
     /**
      * Update a namespace isolation policy for a cluster
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
index 562fbee..32cb5c0 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java
@@ -31,6 +31,7 @@ import javax.ws.rs.core.MediaType;
 import org.apache.pulsar.client.admin.Clusters;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.ErrorData;
@@ -124,6 +125,30 @@ public class ClustersImpl extends BaseResource implements 
Clusters {
         }
     }
 
+    
+    @Override
+    public List<BrokerNamespaceIsolationData> 
getBrokersWithNamespaceIsolationPolicy(String cluster)
+            throws PulsarAdminException {
+        try {
+            return 
request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers"))
+                    .get(new GenericType<List<BrokerNamespaceIsolationData>>() 
{
+                    });
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public BrokerNamespaceIsolationData 
getBrokerWithNamespaceIsolationPolicy(String cluster, String broker)
+            throws PulsarAdminException {
+        try {
+            return 
request(adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers").path(broker))
+                    .get(BrokerNamespaceIsolationData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     @Override
     public void createNamespaceIsolationPolicy(String cluster, String 
policyName,
             NamespaceIsolationData namespaceIsolationData) throws 
PulsarAdminException {
diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index e70f20b..d081ccf 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -23,9 +23,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import java.util.EnumSet;
 
 import org.apache.pulsar.client.admin.BrokerStats;
@@ -53,6 +50,9 @@ import org.mockito.Matchers;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 @Test
 public class PulsarAdminToolTest {
 
@@ -427,6 +427,21 @@ public class PulsarAdminToolTest {
     }
 
     @Test
+    void namespaceIsolationPolicy() throws Exception {
+        PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
+        Clusters mockClusters = mock(Clusters.class);
+        when(admin.clusters()).thenReturn(mockClusters);
+
+        CmdNamespaceIsolationPolicy nsIsolationPoliciesCmd = new 
CmdNamespaceIsolationPolicy(admin);
+
+        nsIsolationPoliciesCmd.run(split("brokers use"));
+        verify(mockClusters).getBrokersWithNamespaceIsolationPolicy("use");
+
+        nsIsolationPoliciesCmd.run(split("broker use --broker my-broker"));
+        verify(mockClusters).getBrokerWithNamespaceIsolationPolicy("use", 
"my-broker");
+    }
+    
+    @Test
     void persistentTopics() throws Exception {
         PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
         PersistentTopics mockTopics = mock(PersistentTopics.class);
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
index f215462..ed3bda6 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
+import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 
 import com.beust.jcommander.Parameter;
@@ -82,6 +83,39 @@ public class CmdNamespaceIsolationPolicy extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "List all brokers with 
namespace-isolation policies attached to it. This operation requires Pulsar 
super-user privileges")
+    private class GetAllBrokersWithPolicies extends CliCommand {
+        @Parameter(description = "cluster-name\n", required = true)
+        private List<String> params;
+
+        void run() throws PulsarAdminException {
+            String clusterName = getOneArgument(params);
+
+            List<BrokerNamespaceIsolationData> brokers = admin.clusters()
+                    .getBrokersWithNamespaceIsolationPolicy(clusterName);
+
+            print(brokers);
+        }
+    }
+
+    @Parameters(commandDescription = "Get broker with namespace-isolation 
policies attached to it. This operation requires Pulsar super-user privileges")
+    private class GetBrokerWithPolicies extends CliCommand {
+        @Parameter(description = "cluster-name\n", required = true)
+        private List<String> params;
+
+        @Parameter(names = "--broker", description = "Broker-name to get 
namespace-isolation policies attached to it", required = true)
+        private String broker;
+
+        void run() throws PulsarAdminException {
+            String clusterName = getOneArgument(params);
+
+            BrokerNamespaceIsolationData brokerData = admin.clusters()
+                    .getBrokerWithNamespaceIsolationPolicy(clusterName, 
broker);
+
+            print(brokerData);
+        }
+    }
+
     @Parameters(commandDescription = "Get namespace isolation policy of a 
cluster. This operation requires Pulsar super-user privileges")
     private class GetPolicy extends CliCommand {
         @Parameter(description = "cluster-name policy-name\n", required = true)
@@ -195,6 +229,8 @@ public class CmdNamespaceIsolationPolicy extends CmdBase {
         jcommander.addCommand("get", new GetPolicy());
         jcommander.addCommand("list", new GetAllPolicies());
         jcommander.addCommand("delete", new DeletePolicy());
+        jcommander.addCommand("brokers", new GetAllBrokersWithPolicies());
+        jcommander.addCommand("broker", new GetBrokerWithPolicies());
     }
 
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerNamespaceIsolationData.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerNamespaceIsolationData.java
new file mode 100644
index 0000000..35fc73c
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/BrokerNamespaceIsolationData.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import java.util.List;
+
+import com.google.common.base.Objects;
+
+public class BrokerNamespaceIsolationData {
+
+    public String brokerName;
+    public List<String> namespaceRegex; //isolated namespace regex
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof BrokerNamespaceIsolationData) {
+            BrokerNamespaceIsolationData other = 
(BrokerNamespaceIsolationData) obj;
+            return Objects.equal(brokerName, other.brokerName) && 
Objects.equal(namespaceRegex, other.namespaceRegex);
+        }
+        return false;
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to