This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f4af65c1de [feature][broker] Allow to configure the entry filters per 
namespace and per topic (#17153)
2f4af65c1de is described below

commit 2f4af65c1de662211645552036ff06d5689fc716
Author: gaozhangmin <zhangmin...@apache.org>
AuthorDate: Tue Aug 30 20:41:42 2022 +0800

    [feature][broker] Allow to configure the entry filters per namespace and 
per topic (#17153)
---
 conf/broker.conf                                   |   3 +
 .../apache/pulsar/broker/ServiceConfiguration.java |   7 ++
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  10 ++
 .../broker/admin/impl/PersistentTopicsBase.java    |  47 +++++++++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  62 ++++++++++++
 .../broker/admin/v2/NonPersistentTopics.java       |  92 +++++++++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  91 +++++++++++++++++
 .../pulsar/broker/service/AbstractTopic.java       |  16 +++
 .../pulsar/broker/service/BrokerService.java       |  31 ++++++
 .../pulsar/broker/service/EntryFilterSupport.java  |  19 +++-
 .../org/apache/pulsar/broker/service/Topic.java    |   7 ++
 .../service/nonpersistent/NonPersistentTopic.java  |  10 ++
 .../broker/service/persistent/PersistentTopic.java |  11 +++
 .../broker/service/plugin/EntryFilterProvider.java |  23 +++++
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  34 +++++++
 .../broker/service/AbstractBaseDispatcherTest.java |   5 +-
 .../broker/service/plugin/FilterEntryTest.java     |  85 +++++++++++++++-
 .../pulsar/broker/stats/ConsumerStatsTest.java     |  10 +-
 .../org/apache/pulsar/client/admin/Namespaces.java |  48 ++++++++-
 .../apache/pulsar/client/admin/TopicPolicies.java  |  51 ++++++++++
 .../pulsar/common/policies/data/EntryFilters.java  |  47 +++------
 .../pulsar/common/policies/data/Policies.java      |   8 +-
 .../client/admin/internal/NamespacesImpl.java      |  38 +++++++
 .../client/admin/internal/TopicPoliciesImpl.java   |  39 ++++++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  45 +++++++++
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  |  61 ++++++++++++
 .../policies/data/HierarchyTopicPolicies.java      |   2 +
 .../pulsar/common/policies/data/PolicyName.java    |   3 +-
 .../pulsar/common/policies/data/TopicPolicies.java |   1 +
 site2/docs/admin-api-namespaces.md                 | 109 +++++++++++++++++++++
 30 files changed, 965 insertions(+), 50 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 957b594338d..6f51c96a008 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -453,6 +453,9 @@ entryFilterNames=
 # The directory for all the entry filter implementations
 entryFiltersDirectory=
 
+# Whether allow topic level entry filters policies overrides broker 
configuration.
+allowOverrideEntryFilters=false
+
 # Max number of concurrent lookup request broker allows to throttle heavy 
incoming lookup traffic
 maxConcurrentLookupRequest=50000
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index df9ac8c7762..fffa57ff4ab 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1103,6 +1103,13 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private String entryFiltersDirectory = "";
 
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            dynamic = true,
+            doc = "Whether allow topic level entry filters policies overrides 
broker configuration."
+    )
+    private boolean allowOverrideEntryFilters = false;
+
     @FieldContext(
         category = CATEGORY_SERVER,
         doc = "Whether to use streaming read dispatcher. Currently is in 
preview and can be changed "
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 08c968a2d5d..0b25599c126 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -83,6 +83,7 @@ import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
@@ -2722,5 +2723,14 @@ public abstract class NamespacesBase extends 
AdminResource {
 
     }
 
+    protected CompletableFuture<Void> 
internalSetEntryFiltersPerTopicAsync(EntryFilters entryFilters) {
+        return validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies 
-> {
+                    policies.entryFilters = entryFilters;
+                    return policies;
+                }));
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3db8f3a302f..6ef6f662029 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -110,6 +110,7 @@ import org.apache.pulsar.common.policies.data.AuthPolicies;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -5335,4 +5336,50 @@ public class PersistentTopicsBase extends AdminResource {
                     return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
                 });
     }
+
+    protected CompletableFuture<EntryFilters> internalGetEntryFilters(boolean 
applied, boolean isGlobal) {
+        return validateTopicPolicyOperationAsync(topicName, 
PolicyName.ENTRY_FILTERS, PolicyOperation.READ)
+                .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal)
+                .thenApply(op -> op.map(TopicPolicies::getEntryFilters)
+                        .orElseGet(() -> {
+                            if (applied) {
+                                EntryFilters entryFilters = 
getNamespacePolicies(namespaceName).entryFilters;
+                                if (entryFilters == null) {
+                                    return new EntryFilters(String.join(",",
+                                            
pulsar().getConfiguration().getEntryFilterNames()));
+                                }
+                                return entryFilters;
+                            }
+                            return null;
+                        })));
+
+    }
+
+    protected CompletableFuture<Void> internalSetEntryFilters(EntryFilters 
entryFilters,
+                                                              boolean 
isGlobal) {
+
+        return validateTopicPolicyOperationAsync(topicName, 
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
+                .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, 
isGlobal)
+                        .thenCompose(op -> {
+                            TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);
+                            topicPolicies.setEntryFilters(entryFilters);
+                            topicPolicies.setIsGlobal(isGlobal);
+                            return pulsar().getTopicPoliciesService()
+                                    .updateTopicPoliciesAsync(topicName, 
topicPolicies);
+                        }));
+    }
+
+    protected CompletableFuture<Void> internalRemoveEntryFilters(boolean 
isGlobal) {
+        return validateTopicPolicyOperationAsync(topicName, 
PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE)
+                .thenCompose(__ ->
+                        getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
+                        .thenCompose(op -> {
+                            if (!op.isPresent()) {
+                                return CompletableFuture.completedFuture(null);
+                            }
+                            op.get().setEntryFilters(null);
+                            op.get().setIsGlobal(isGlobal);
+                            return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
op.get());
+                        }));
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index e43e01ced93..7fae0a5709e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -60,6 +60,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -2658,5 +2659,66 @@ public class Namespaces extends NamespacesBase {
         }
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/entryFilters")
+    @ApiOperation(value = "Get maxConsumersPerSubscription config on a 
namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist") })
+    public void getEntryFiltersPerTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        validateNamespacePolicyOperationAsync(namespaceName, 
PolicyName.ENTRY_FILTERS, PolicyOperation.READ)
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenAccept(polices -> 
asyncResponse.resume(polices.entryFilters))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get entry filters config on 
namespace {}: {} ",
+                            clientAppId(), namespaceName, 
ex.getCause().getMessage(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/entryFilters")
+    @ApiOperation(value = "Set entry filters for namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist")})
+    public void setEntryFiltersPerTopic(@Suspended AsyncResponse 
asyncResponse, @PathParam("tenant") String tenant,
+                                       @PathParam("namespace") String 
namespace,
+                                       @ApiParam(value = "entry filters", 
required = true)
+                                               EntryFilters entryFilters) {
+        validateNamespaceName(tenant, namespace);
+        internalSetEntryFiltersPerTopicAsync(entryFilters)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("Failed to set entry filters for namespace {}", 
namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/entryFilters")
+    @ApiOperation(value = "Remove entry filters for namespace")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace 
doesn't exist"),
+            @ApiResponse(code = 412, message = "Invalid TTL")})
+    public void removeNamespaceEntryFilters(@Suspended AsyncResponse 
asyncResponse,
+                                          @PathParam("tenant") String tenant,
+                                          @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalSetEntryFiltersPerTopicAsync(null)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("Failed to remove entry filters for namespace 
{}", namespaceName, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
+
+
     private static final Logger log = 
LoggerFactory.getLogger(Namespaces.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index fa8bcde26ba..82da5f91263 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -36,6 +36,7 @@ import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
@@ -53,6 +54,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicOperation;
@@ -509,6 +511,96 @@ public class NonPersistentTopics extends PersistentTopics {
         validateTopicOwnership(topicName, authoritative);
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/entryFilters")
+    @ApiOperation(value = "Get entry filters for a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenants or Namespace doesn't 
exist") })
+    public void getEntryFilters(@Suspended AsyncResponse asyncResponse,
+                                @ApiParam(value = "Specify the tenant", 
required = true)
+                                @PathParam("tenant") String tenant,
+                                @ApiParam(value = "Specify the namespace", 
required = true)
+                                @PathParam("namespace") String namespace,
+                                @ApiParam(value = "Specify topic name", 
required = true)
+                                @PathParam("topic") @Encoded String 
encodedTopic,
+                                @QueryParam("applied") @DefaultValue("false") 
boolean applied,
+                                @QueryParam("isGlobal") @DefaultValue("false") 
boolean isGlobal,
+                                @ApiParam(value = "Whether leader broker 
redirected this call to this "
+                                        + "broker. For internal use.")
+                                @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+                .thenCompose(__ -> internalGetEntryFilters(applied, isGlobal))
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("getEntryFilters", ex, 
asyncResponse);
+                    return null;
+                });
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/entryFilters")
+    @ApiOperation(value = "Set entry filters for specified topic")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, please enable 
the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setEntryFilters(@Suspended final AsyncResponse asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @PathParam("topic") @Encoded String 
encodedTopic,
+                                @QueryParam("isGlobal") @DefaultValue("false") 
boolean isGlobal,
+                                @ApiParam(value = "Whether leader broker 
redirected this "
+                                        + "call to this broker. For internal 
use.")
+                                @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative,
+                                @ApiParam(value = "Enable sub types for the 
specified topic")
+                                        EntryFilters entryFilters) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+                .thenCompose(__ -> internalSetEntryFilters(entryFilters, 
isGlobal))
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("setEntryFilters", ex, 
asyncResponse);
+                    return null;
+                });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/entryFilters")
+    @ApiOperation(value = "Remove entry filters for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, please enable 
the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeEntryFilters(@Suspended final AsyncResponse 
asyncResponse,
+                                   @PathParam("tenant") String tenant,
+                                   @PathParam("namespace") String namespace,
+                                   @PathParam("topic") @Encoded String 
encodedTopic,
+                                   @QueryParam("isGlobal") 
@DefaultValue("false") boolean isGlobal,
+                                   @ApiParam(value = "Whether leader broker 
redirected this"
+                                           + "call to this broker. For 
internal use.")
+                                   @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+                .thenCompose(__ -> internalRemoveEntryFilters(isGlobal))
+                .thenRun(() -> {
+                    log.info(
+                            "[{}] Successfully remove entry filters: 
tenant={}, namespace={}, topic={}, isGlobal={}",
+                            clientAppId(),
+                            tenant,
+                            namespace,
+                            topicName.getLocalName(),
+                            isGlobal);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("removeEntryFilters", ex, 
asyncResponse);
+                    return null;
+                });
+    }
+
     private Topic getTopicReference(TopicName topicName) {
         try {
             return 
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 56131a1d5fc..976368f10fa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -58,6 +58,7 @@ import 
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -4272,5 +4273,95 @@ public class PersistentTopics extends 
PersistentTopicsBase {
                 });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/entryFilters")
+    @ApiOperation(value = "Get entry filters for a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Tenants or Namespace doesn't 
exist") })
+    public void getEntryFilters(@Suspended AsyncResponse asyncResponse,
+                                @ApiParam(value = "Specify the tenant", 
required = true)
+                                @PathParam("tenant") String tenant,
+                                @ApiParam(value = "Specify the namespace", 
required = true)
+                                @PathParam("namespace") String namespace,
+                                @ApiParam(value = "Specify topic name", 
required = true)
+                                @PathParam("topic") @Encoded String 
encodedTopic,
+                                @QueryParam("applied") @DefaultValue("false") 
boolean applied,
+                                @QueryParam("isGlobal") @DefaultValue("false") 
boolean isGlobal,
+                                @ApiParam(value = "Whether leader broker 
redirected this call to this "
+                                        + "broker. For internal use.")
+                                @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+                .thenCompose(__ -> internalGetEntryFilters(applied, isGlobal))
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("getEntryFilters", ex, 
asyncResponse);
+                    return null;
+                });
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/entryFilters")
+    @ApiOperation(value = "Set entry filters for specified topic")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, please enable 
the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setEntryFilters(@Suspended final AsyncResponse asyncResponse,
+                                            @PathParam("tenant") String tenant,
+                                            @PathParam("namespace") String 
namespace,
+                                            @PathParam("topic") @Encoded 
String encodedTopic,
+                                            @QueryParam("isGlobal") 
@DefaultValue("false") boolean isGlobal,
+                                            @ApiParam(value = "Whether leader 
broker redirected this"
+                                                    + "call to this broker. 
For internal use.")
+                                            @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative,
+                                            @ApiParam(value = "Entry filters 
for the specified topic")
+                                        EntryFilters entryFilters) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+                .thenCompose(__ -> internalSetEntryFilters(entryFilters, 
isGlobal))
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("setEntryFilters", ex, 
asyncResponse);
+                    return null;
+                });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/entryFilters")
+    @ApiOperation(value = "Remove entry filters for specified topic.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405,
+                    message = "Topic level policy is disabled, please enable 
the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removeEntryFilters(@Suspended final AsyncResponse 
asyncResponse,
+                                    @PathParam("tenant") String tenant,
+                                    @PathParam("namespace") String namespace,
+                                    @PathParam("topic") @Encoded String 
encodedTopic,
+                                    @QueryParam("isGlobal") 
@DefaultValue("false") boolean isGlobal,
+                                    @ApiParam(value = "Whether leader broker 
redirected this"
+                                            + "call to this broker. For 
internal use.")
+                                    @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        preValidation(authoritative)
+                .thenCompose(__ -> internalRemoveEntryFilters(isGlobal))
+                .thenRun(() -> {
+                    log.info(
+                            "[{}] Successfully remove entry filters: 
tenant={}, namespace={}, topic={}, isGlobal={}",
+                            clientAppId(),
+                            tenant,
+                            namespace,
+                            topicName.getLocalName(),
+                            isGlobal);
+                    asyncResponse.resume(Response.noContent().build());
+                })
+                .exceptionally(ex -> {
+                    handleTopicPolicyException("removeEntryFilters", ex, 
asyncResponse);
+                    return null;
+                });
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentTopics.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 0010c9bbd8a..a3df1498698 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -52,6 +53,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyExcep
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
+import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
 import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
@@ -60,6 +62,7 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -144,6 +147,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
 
     protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
     protected final LongAdder bytesOutFromRemovedSubscriptions = new 
LongAdder();
+    protected ImmutableMap<String, EntryFilterWithClassLoader> entryFilters;
 
     public AbstractTopic(String topic, BrokerService brokerService) {
         this.topic = topic;
@@ -179,6 +183,14 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         return this.topicPolicies.getDispatchRate().get();
     }
 
+    public EntryFilters getEntryFiltersPolicy() {
+        return this.topicPolicies.getEntryFilters().get();
+    }
+
+    public ImmutableMap<String, EntryFilterWithClassLoader> getEntryFilters() {
+        return this.entryFilters;
+    }
+
     public DispatchRateImpl getReplicatorDispatchRate() {
         return this.topicPolicies.getReplicatorDispatchRate().get();
     }
@@ -225,6 +237,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         
topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
         
topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate()));
         
topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced());
+        
topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters());
         this.subscriptionPolicies = data.getSubscriptionPolicies();
     }
 
@@ -273,6 +286,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
         updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
         updateNamespaceDispatchRate(namespacePolicies, 
brokerService.getPulsar().getConfig().getClusterName());
         
topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced);
+        
topicPolicies.getEntryFilters().updateNamespaceValue(namespacePolicies.entryFilters);
     }
 
     private void updateNamespaceDispatchRate(Policies namespacePolicies, 
String cluster) {
@@ -367,6 +381,8 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
                 
.updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy));
         
topicPolicies.getDispatchRate().updateBrokerValue(dispatchRateInBroker(config));
         
topicPolicies.getSchemaValidationEnforced().updateBrokerValue(config.isSchemaValidationEnforced());
+        topicPolicies.getEntryFilters().updateBrokerValue(new 
EntryFilters(String.join(",",
+                config.getEntryFilterNames())));
     }
 
     private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) 
{
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index f19be58f958..8985cb9ec07 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -146,6 +146,7 @@ import 
org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -1139,6 +1140,20 @@ public class BrokerService implements Closeable {
         CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
         isOwner.thenRun(() -> {
             nonPersistentTopic.initialize()
+                    .thenAccept(__ -> {
+                        EntryFilters entryFiltersPolicy = 
nonPersistentTopic.getEntryFiltersPolicy();
+                        if 
(!entryFiltersPolicy.getEntryFilterNames().isEmpty()) {
+                            try {
+                                nonPersistentTopic.entryFilters =
+                                        
EntryFilterProvider.createEntryFilters(pulsar.getConfig(),
+                                                entryFiltersPolicy);
+                            } catch (IOException e) {
+                                log.warn("Failed to set entry filters on topic 
{}-{}", topic, e.getMessage());
+                                pulsar.getExecutor().execute(() -> 
topics.remove(topic, topicFuture));
+                                topicFuture.completeExceptionally(e);
+                            }
+                        }
+                    })
                     .thenCompose(__ -> nonPersistentTopic.checkReplication())
                     .thenRun(() -> {
                         log.info("Created topic {}", nonPersistentTopic);
@@ -1431,6 +1446,22 @@ public class BrokerService implements Closeable {
                                         : newTopic(topic, ledger, 
BrokerService.this, PersistentTopic.class);
                                 persistentTopic
                                         .initialize()
+                                        .thenAccept(__ -> {
+                                            EntryFilters entryFiltersPolicy = 
persistentTopic.getEntryFiltersPolicy();
+                                            if 
(!entryFiltersPolicy.getEntryFilterNames().isEmpty()) {
+                                                try {
+                                                    
persistentTopic.entryFilters =
+                                                            
EntryFilterProvider.createEntryFilters(pulsar.getConfig(),
+                                                                    
entryFiltersPolicy);
+                                                } catch (IOException e) {
+                                                    log.warn("Failed to set 
entry filters on topic {}-{}", topic,
+                                                            e.getMessage());
+                                                    
pulsar.getExecutor().execute(() ->
+                                                            
topics.remove(topic, topicFuture));
+                                                    
topicFuture.completeExceptionally(e);
+                                                }
+                                            }
+                                        })
                                         .thenCompose(__ -> 
persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
                                         .thenCompose(__ -> 
persistentTopic.checkReplication())
                                         .thenCompose(v -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
index b0c7385a28f..7a4700a90a6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
@@ -39,9 +40,21 @@ public class EntryFilterSupport {
 
     public EntryFilterSupport(Subscription subscription) {
         this.subscription = subscription;
-        if (subscription != null && subscription.getTopic() != null && 
MapUtils.isNotEmpty(subscription.getTopic()
-                .getBrokerService().getEntryFilters())) {
-            this.entryFilters = 
subscription.getTopic().getBrokerService().getEntryFilters().values().asList();
+        if (subscription != null && subscription.getTopic() != null) {
+            if (MapUtils.isNotEmpty(subscription.getTopic()
+                    .getBrokerService().getEntryFilters())
+                    && !subscription.getTopic().getBrokerService().pulsar()
+                    .getConfiguration().isAllowOverrideEntryFilters()) {
+                this.entryFilters = 
subscription.getTopic().getBrokerService().getEntryFilters().values().asList();
+            } else {
+                ImmutableMap<String, EntryFilterWithClassLoader>  
entryFiltersMap =
+                        subscription.getTopic().getEntryFilters();
+                if (entryFiltersMap != null) {
+                    this.entryFilters = 
subscription.getTopic().getEntryFilters().values().asList();
+                } else {
+                    this.entryFilters = ImmutableList.of();
+                }
+            }
             this.filterContext = new FilterContext();
         } else {
             this.entryFilters = ImmutableList.of();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 038f209fc1f..b4f27adcc4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import com.google.common.collect.ImmutableMap;
 import io.netty.buffer.ByteBuf;
 import java.util.Map;
 import java.util.Optional;
@@ -26,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
+import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
@@ -35,6 +37,7 @@ import 
org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -231,6 +234,10 @@ public interface Topic {
 
     boolean isReplicated();
 
+    EntryFilters getEntryFiltersPolicy();
+
+    ImmutableMap<String, EntryFilterWithClassLoader> getEntryFilters();
+
     BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType);
 
     void updateRates(NamespaceStats nsStats, NamespaceBundleStats 
currentBundleStats,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index db77d12e713..0b5a1ed9731 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -497,6 +497,16 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
             
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
         }
 
+        if (entryFilters != null) {
+            entryFilters.forEach((name, filter) -> {
+                try {
+                    filter.close();
+                } catch (Exception e) {
+                    log.warn("Error shutting down entry filter {}", name, e);
+                }
+            });
+        }
+
         CompletableFuture<Void> clientCloseFuture =
                 closeWithoutWaitingClientDisconnect ? 
CompletableFuture.completedFuture(null)
                         : FutureUtil.waitForAll(futures);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4777546bc84..c8cd487c5ee 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1280,6 +1280,17 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
         }
 
+        //close entry filters
+        if (entryFilters != null) {
+            entryFilters.forEach((name, filter) -> {
+                try {
+                    filter.close();
+                } catch (Exception e) {
+                    log.warn("Error shutting down entry filter {}", name, e);
+                }
+            });
+        }
+
         CompletableFuture<Void> clientCloseFuture = 
closeWithoutWaitingClientDisconnect
                 ? CompletableFuture.completedFuture(null)
                 : FutureUtil.waitForAll(futures);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
index 48c8d9a7683..048b053464e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java
@@ -33,6 +33,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 
 @Slf4j
@@ -44,6 +45,28 @@ public class EntryFilterProvider {
     /**
      * create entry filter instance.
      */
+    public static ImmutableMap<String, EntryFilterWithClassLoader> 
createEntryFilters(ServiceConfiguration conf,
+                                                                               
       EntryFilters entryFilters)
+            throws IOException {
+        EntryFilterDefinitions definitions = 
searchForEntryFilters(conf.getEntryFiltersDirectory(),
+                conf.getNarExtractionDirectory());
+        ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder = 
ImmutableMap.builder();
+        for (String filterName : 
entryFilters.getEntryFilterNames().split(",")) {
+            EntryFilterMetaData metaData = 
definitions.getFilters().get(filterName);
+            if (null == metaData) {
+                throw new RuntimeException("No entry filter is found for name 
`" + filterName
+                        + "`. Available entry filters are : " + 
definitions.getFilters());
+            }
+            EntryFilterWithClassLoader filter;
+            filter = load(metaData, conf.getNarExtractionDirectory());
+            if (filter != null) {
+                builder.put(filterName, filter);
+            }
+            log.info("Successfully loaded entry filter for name `{}` from 
topic policy", filterName);
+        }
+        return builder.build();
+    }
+
     public static ImmutableMap<String, EntryFilterWithClassLoader> 
createEntryFilters(
             ServiceConfiguration conf) throws IOException {
         EntryFilterDefinitions definitions = 
searchForEntryFilters(conf.getEntryFiltersDirectory(),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 33527e18da9..99ad1b99abd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -98,6 +98,7 @@ import 
org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
@@ -2132,6 +2133,39 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test(timeOut = 30000)
+    public void testSetNamespaceEntryFilters() throws Exception {
+        EntryFilters entryFilters = new EntryFilters(
+                "org.apache.pulsar.broker.service.plugin.EntryFilterTest");
+
+        final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
+        admin.namespaces().createNamespace(myNamespace, 
Sets.newHashSet("test"));
+
+        assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
+
+        admin.namespaces().setNamespaceEntryFilters(myNamespace, entryFilters);
+        assertEquals(admin.namespaces().getNamespaceEntryFilters(myNamespace), 
entryFilters);
+        admin.namespaces().removeNamespaceEntryFilters(myNamespace);
+        assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace));
+    }
+
+    @Test(dataProvider = "topicType")
+    public void testSetTopicLevelEntryFilters(String topicType) throws 
Exception {
+        EntryFilters entryFilters = new 
EntryFilters("org.apache.pulsar.broker.service.plugin.EntryFilterTest");
+        final String topic = topicType + 
"://prop-xyz/ns1/test-schema-validation-enforced";
+        admin.topics().createPartitionedTopic(topic, 1);
+        @Cleanup
+        Producer<byte[]> producer1 = pulsarClient.newProducer()
+                .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0)
+                .create();
+        assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, 
false));
+        admin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilters);
+        Awaitility.await().untilAsserted(() -> 
assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic,
+                false), entryFilters));
+        admin.topicPolicies().removeEntryFiltersPerTopic(topic);
+        assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, 
false));
+    }
+
     @Test(timeOut = 30000)
     public void testMaxSubPerTopic() throws Exception {
         pulsar.getConfiguration().setMaxSubscriptionsPerTopic(0);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index d89738869a8..cc1558fed27 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -78,18 +78,17 @@ public class AbstractBaseDispatcherTest {
 
 
     @Test
-    public void testFilterEntriesForConsumerOfEntryFilter() {
+    public void testFilterEntriesForConsumerOfEntryFilter() throws Exception {
         Topic mockTopic = mock(Topic.class);
         when(this.subscriptionMock.getTopic()).thenReturn(mockTopic);
 
         BrokerService mockBrokerService = mock(BrokerService.class);
         when(mockTopic.getBrokerService()).thenReturn(mockBrokerService);
-
         EntryFilterWithClassLoader mockFilter = 
mock(EntryFilterWithClassLoader.class);
         when(mockFilter.filterEntry(any(Entry.class), 
any(FilterContext.class))).thenReturn(
                 EntryFilter.FilterResult.REJECT);
         ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = 
ImmutableMap.of("key", mockFilter);
-        when(mockBrokerService.getEntryFilters()).thenReturn(entryFilters);
+        when(mockTopic.getEntryFilters()).thenReturn(entryFilters);
 
         this.helper = new 
AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
index ee100f3fdee..a5f8b5ab38f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java
@@ -19,9 +19,12 @@
 package org.apache.pulsar.broker.service.plugin;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static 
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNotNull;
 import com.google.common.collect.ImmutableList;
@@ -35,13 +38,14 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.AbstractTopic;
-import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.EntryFilterSupport;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
@@ -71,6 +75,77 @@ public class FilterEntryTest extends BrokerTestBase {
         internalCleanup();
     }
 
+    @Test
+    public void testOverride() throws Exception {
+        conf.setAllowOverrideEntryFilters(true);
+        String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID();
+        String subName = "sub";
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(false).topic(topic).create();
+        for (int i = 0; i < 10; i++) {
+            producer.send("test");
+        }
+
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService()
+                .getTopicReference(topic).get();
+
+        // set topic level entry filters
+        EntryFilterWithClassLoader mockFilter = 
mock(EntryFilterWithClassLoader.class);
+        when(mockFilter.filterEntry(any(Entry.class), 
any(FilterContext.class))).thenReturn(
+                EntryFilter.FilterResult.REJECT);
+        ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = 
ImmutableMap.of("key", mockFilter);
+
+        Field field = 
topicRef.getClass().getSuperclass().getDeclaredField("entryFilters");
+        field.setAccessible(true);
+        field.set(topicRef, entryFilters);
+
+        EntryFilterWithClassLoader mockFilter1 = 
mock(EntryFilterWithClassLoader.class);
+        when(mockFilter1.filterEntry(any(Entry.class), 
any(FilterContext.class))).thenReturn(
+                EntryFilter.FilterResult.ACCEPT);
+        ImmutableMap<String, EntryFilterWithClassLoader> entryFilters1 = 
ImmutableMap.of("key2", mockFilter1);
+        Field field2 = 
pulsar.getBrokerService().getClass().getDeclaredField("entryFilters");
+        field2.setAccessible(true);
+        field2.set(pulsar.getBrokerService(), entryFilters1);
+
+        Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionInitialPosition(Earliest)
+                .subscriptionName(subName).subscribe();
+
+        int counter = 0;
+        while (true) {
+            Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+            if (message != null) {
+                counter++;
+                consumer.acknowledge(message);
+            } else {
+                break;
+            }
+        }
+        // All normal messages can be received
+        assertEquals(0, counter);
+
+
+        conf.setAllowOverrideEntryFilters(false);
+        consumer.close();
+        consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic)
+                .subscriptionInitialPosition(Earliest)
+                .subscriptionName(subName + "1").subscribe();
+        int counter1 = 0;
+        while (true) {
+            Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
+            if (message != null) {
+                counter1++;
+                consumer.acknowledge(message);
+            } else {
+                break;
+            }
+        }
+        // All normal messages can be received
+        assertEquals(10, counter1);
+        conf.setAllowOverrideEntryFilters(false);
+        consumer.close();
+    }
+
     @Test
     public void testFilter() throws Exception {
         Map<String, String> map = new HashMap<>();
@@ -184,10 +259,12 @@ public class FilterEntryTest extends BrokerTestBase {
         producer.close();
         consumer.close();
 
-        BrokerService brokerService = pulsar.getBrokerService();
-        Field field1 = BrokerService.class.getDeclaredField("entryFilters");
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService()
+                .getTopicReference(topic).get();
+        Field field1 = 
topicRef.getClass().getSuperclass().getDeclaredField("entryFilters");
         field1.setAccessible(true);
-        field1.set(brokerService, ImmutableMap.of("1", loader1, "2", loader2));
+        field1.set(topicRef, ImmutableMap.of("1", loader1, "2", loader2));
+
         cleanup();
         verify(loader1, times(1)).close();
         verify(loader2, times(1)).close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index e845e6e71fd..801a55ad419 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.stats;
 
 import static 
org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertNotEquals;
@@ -30,6 +29,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -410,8 +410,12 @@ public class ConsumerStatsTest extends 
ProducerConsumerBase {
                 loader = 
spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter,
                 narClassLoader);
         ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = 
ImmutableMap.of("filter", loader);
-        BrokerService brokerService = pulsar.getBrokerService();
-        doReturn(entryFilters).when(brokerService).getEntryFilters();
+
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService()
+                .getTopicReference(topic).get();
+        Field field1 = 
topicRef.getClass().getSuperclass().getDeclaredField("entryFilters");
+        field1.setAccessible(true);
+        field1.set(topicRef, entryFilters);
 
         Map<String, String> metadataConsumer = new HashMap<>();
         metadataConsumer.put("matchValueAccept", "producer1");
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 39cf49cf961..4fc50bf5b2e 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -36,6 +36,7 @@ import 
org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -4167,7 +4168,6 @@ public interface Namespaces {
     CompletableFuture<Void> removeMaxTopicsPerNamespaceAsync(String namespace);
 
     /**
-<<<<<<< HEAD
      * Set key value pair property for a namespace.
      * If the property absents, a new property will added. Otherwise, the new 
value will overwrite.
      *
@@ -4467,4 +4467,50 @@ public interface Namespaces {
      * @return
      */
     CompletableFuture<Void> removeNamespaceResourceGroupAsync(String 
namespace);
+
+    /**
+     * Get entry filters for a namespace.
+     * @param namespace
+     * @return entry filters classes info.
+     * @throws PulsarAdminException
+     */
+    EntryFilters getNamespaceEntryFilters(String namespace) throws 
PulsarAdminException;
+
+    /**
+     * Get entry filters for a namespace asynchronously.
+     *
+     * @param namespace
+     * @return entry filters classes info.
+     */
+    CompletableFuture<EntryFilters> getNamespaceEntryFiltersAsync(String 
namespace);
+
+    /**
+     * Set entry filters on a namespace.
+     *
+     * @param namespace    Namespace name
+     * @param entryFilters The entry filters
+     */
+    void setNamespaceEntryFilters(String namespace, EntryFilters entryFilters) 
throws PulsarAdminException;
+
+    /**
+     * Set entry filters on a namespace asynchronously.
+     *
+     * @param namespace    Namespace name
+     * @param entryFilters The entry filters
+     */
+    CompletableFuture<Void> setNamespaceEntryFiltersAsync(String namespace, 
EntryFilters entryFilters);
+
+    /**
+     * remove entry filters of a namespace.
+     * @param namespace    Namespace name
+     * @throws PulsarAdminException
+     */
+    void removeNamespaceEntryFilters(String namespace) throws 
PulsarAdminException;
+
+    /**
+     * remove entry filters of a namespace asynchronously.
+     * @param namespace     Namespace name
+     * @return
+     */
+    CompletableFuture<Void> removeNamespaceEntryFiltersAsync(String namespace);
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
index dabe69301cf..827bb99132b 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -1763,4 +1764,54 @@ public interface TopicPolicies {
      * @param topic The topic in whose policy should be removed
      */
     CompletableFuture<Void> removeSchemaCompatibilityStrategyAsync(String 
topic);
+
+    /**
+     * Get applied entry filters for a topic.
+     * @param topic
+     * @param applied
+     * @return entry filters classes info.
+     * @throws PulsarAdminException
+     */
+    EntryFilters getEntryFiltersPerTopic(String topic, boolean applied) throws 
PulsarAdminException;
+
+    /**
+     * Get applied entry filters for a topic asynchronously.
+     *
+     * @param topic
+     * @param applied
+     * @return
+     */
+    CompletableFuture<EntryFilters> getEntryFiltersPerTopicAsync(String topic, 
boolean applied);
+
+    /**
+     * Set entry filters on a topic.
+     *
+     * @param topic    The topic in whose policy should be set
+     * @param entryFilters The entry filters
+     */
+    void setEntryFiltersPerTopic(String topic, EntryFilters entryFilters) 
throws PulsarAdminException;
+
+    /**
+     * Set entry filters on a topic asynchronously.
+     *
+     * @param topic    The topic in whose policy should be set
+     * @param entryFilters The entry filters
+     */
+    CompletableFuture<Void> setEntryFiltersPerTopicAsync(String topic, 
EntryFilters entryFilters);
+
+    /**
+     * remove entry filters of a topic.
+     * @param topic
+     * @throws PulsarAdminException
+     */
+    void removeEntryFiltersPerTopic(String topic) throws PulsarAdminException;
+
+    /**
+     * remove entry filters of a topic asynchronously.
+     * @param topic
+     * @return
+     */
+    CompletableFuture<Void> removeEntryFiltersPerTopicAsync(String topic);
+
+
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java
similarity index 58%
copy from 
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
copy to 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java
index 2b115023f9d..785fbce39e1 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java
@@ -16,39 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.pulsar.common.policies.data;
 
-/**
- * PolicyName authorization operations.
- */
-public enum PolicyName {
-    ALL,
-    ANTI_AFFINITY,
-    AUTO_SUBSCRIPTION_CREATION,
-    AUTO_TOPIC_CREATION,
-    BACKLOG,
-    COMPACTION,
-    DELAYED_DELIVERY,
-    INACTIVE_TOPIC,
-    DEDUPLICATION,
-    MAX_CONSUMERS,
-    MAX_PRODUCERS,
-    DEDUPLICATION_SNAPSHOT,
-    MAX_UNACKED,
-    MAX_SUBSCRIPTIONS,
-    OFFLOAD,
-    PARTITION,
-    PERSISTENCE,
-    RATE,
-    RETENTION,
-    REPLICATION,
-    REPLICATION_RATE,
-    SCHEMA_COMPATIBILITY_STRATEGY,
-    SUBSCRIPTION_AUTH_MODE,
-    SUBSCRIPTION_EXPIRATION_TIME,
-    ENCRYPTION,
-    TTL,
-    MAX_TOPICS,
-    RESOURCEGROUP
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class EntryFilters {
+
+    /**
+     * The class name for the entry filter.
+     */
+    private String entryFilterNames;
+
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index ebcd7fc09c1..e2b186a5a16 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -128,6 +128,9 @@ public class Policies {
         LARGEST, HOT;
     }
 
+    @SuppressWarnings("checkstyle:MemberName")
+    public EntryFilters entryFilters = null;
+
     @Override
     public int hashCode() {
         return Objects.hash(auth_policies, replication_clusters,
@@ -151,7 +154,7 @@ public class Policies {
                 offload_policies,
                 subscription_types_enabled,
                 properties,
-                resource_group_name);
+                resource_group_name, entryFilters);
     }
 
     @Override
@@ -196,7 +199,8 @@ public class Policies {
                     && Objects.equals(offload_policies, other.offload_policies)
                     && Objects.equals(subscription_types_enabled, 
other.subscription_types_enabled)
                     && Objects.equals(properties, other.properties)
-                    && Objects.equals(resource_group_name, 
other.resource_group_name);
+                    && Objects.equals(resource_group_name, 
other.resource_group_name)
+                    && Objects.equals(entryFilters, other.entryFilters);
         }
 
         return false;
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index bfb060596b1..d62312ee7d7 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -44,6 +44,7 @@ import 
org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
@@ -1875,4 +1876,41 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
         WebTarget path = namespacePath(ns, parts);
         return asyncGetRequest(path, callback);
     }
+
+    @Override
+    public EntryFilters getNamespaceEntryFilters(String namespace) throws 
PulsarAdminException {
+        return sync(() -> getNamespaceEntryFiltersAsync(namespace));
+    }
+
+    @Override
+    public CompletableFuture<EntryFilters> 
getNamespaceEntryFiltersAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "entryFilters");
+        return asyncGetRequest(path, new FutureCallback<EntryFilters>(){});
+    }
+
+    @Override
+    public void setNamespaceEntryFilters(String namespace, EntryFilters 
entryFilters)
+            throws PulsarAdminException {
+        sync(() -> setNamespaceEntryFiltersAsync(namespace, entryFilters));
+    }
+
+    @Override
+    public CompletableFuture<Void> setNamespaceEntryFiltersAsync(String 
namespace, EntryFilters entryFilters) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "entryFilters");
+        return asyncPostRequest(path, Entity.entity(entryFilters, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeNamespaceEntryFilters(String namespace) throws 
PulsarAdminException {
+        sync(() -> removeNamespaceEntryFiltersAsync(namespace));
+    }
+
+    @Override
+    public CompletableFuture<Void> removeNamespaceEntryFiltersAsync(String 
namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "entryFilters");
+        return asyncDeleteRequest(path);
+    }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
index 29226567216..821a4e13e18 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
@@ -1178,6 +1179,44 @@ public class TopicPoliciesImpl extends BaseResource 
implements TopicPolicies {
         return asyncDeleteRequest(path);
     }
 
+    @Override
+    public EntryFilters getEntryFiltersPerTopic(String topic, boolean applied) 
throws PulsarAdminException {
+        return sync(() -> getEntryFiltersPerTopicAsync(topic, applied));
+    }
+
+    @Override
+    public CompletableFuture<EntryFilters> getEntryFiltersPerTopicAsync(String 
topic, boolean applied) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "entryFilters");
+        path = path.queryParam("applied", applied);
+        return asyncGetRequest(path, new FutureCallback<EntryFilters>(){});
+    }
+
+    @Override
+    public void setEntryFiltersPerTopic(String topic, EntryFilters 
entryFilters)
+            throws PulsarAdminException {
+        sync(() -> setEntryFiltersPerTopicAsync(topic, entryFilters));
+    }
+
+    @Override
+    public CompletableFuture<Void> setEntryFiltersPerTopicAsync(String topic, 
EntryFilters entryFilters) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "entryFilters");
+        return asyncPostRequest(path, Entity.entity(entryFilters, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeEntryFiltersPerTopic(String topic) throws 
PulsarAdminException {
+        sync(() -> removeEntryFiltersPerTopicAsync(topic));
+    }
+
+    @Override
+    public CompletableFuture<Void> removeEntryFiltersPerTopicAsync(String 
topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "entryFilters");
+        return asyncDeleteRequest(path);
+    }
+
     /*
      * returns topic name with encoded Local Name
      */
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 8b4679e43c5..c89bb152fc1 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -50,6 +50,7 @@ import 
org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
@@ -2588,6 +2589,46 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get entry filters for a namespace")
+    private class GetEntryFiltersPerTopic extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(getAdmin().namespaces().getNamespaceEntryFilters(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set entry filters for a namespace")
+    private class SetEntryFiltersPerTopic extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--entry-filters-name", "-efn" },
+                description = "The class name for the entry filter.", required 
= true)
+        private String  entryFiltersName = "";
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            getAdmin().namespaces().setNamespaceEntryFilters(namespace, new 
EntryFilters(entryFiltersName));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove entry filters for a namespace")
+    private class RemoveEntryFiltersPerTopic extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            getAdmin().namespaces().removeNamespaceEntryFilters(namespace);
+        }
+    }
+
     public CmdNamespaces(Supplier<PulsarAdmin> admin) {
         super("namespaces", admin);
         jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -2768,5 +2809,9 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("get-resource-group", new GetResourceGroup());
         jcommander.addCommand("set-resource-group", new SetResourceGroup());
         jcommander.addCommand("remove-resource-group", new 
RemoveResourceGroup());
+
+        jcommander.addCommand("get-entry-filters", new 
GetEntryFiltersPerTopic());
+        jcommander.addCommand("set-entry-filters", new 
SetEntryFiltersPerTopic());
+        jcommander.addCommand("remove-entry-filters", new 
RemoveEntryFiltersPerTopic());
     }
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index b3bf27fe54a..99bf69b10f5 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.EntryFilters;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -146,6 +147,66 @@ public class CmdTopicPolicies extends CmdBase {
         jcommander.addCommand("remove-schema-compatibility-strategy", new 
RemoveSchemaCompatibilityStrategy());
         jcommander.addCommand("set-schema-compatibility-strategy", new 
SetSchemaCompatibilityStrategy());
         jcommander.addCommand("get-schema-compatibility-strategy", new 
GetSchemaCompatibilityStrategy());
+
+        jcommander.addCommand("get-entry-filters-per-topic", new 
GetEntryFiltersPerTopic());
+        jcommander.addCommand("set-entry-filters-per-topic", new 
SetEntryFiltersPerTopic());
+        jcommander.addCommand("remove-entry-filters-per-topic", new 
RemoveEntryFiltersPerTopic());
+    }
+
+    @Parameters(commandDescription = "Get entry filters for a topic")
+    private class GetEntryFiltersPerTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to get 
this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Parameter(names = { "-ap", "--applied" }, description = "Get the 
applied policy of the topic")
+        private boolean applied = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            
print(getTopicPolicies(isGlobal).getEntryFiltersPerTopic(persistentTopic, 
applied));
+        }
+    }
+
+    @Parameters(commandDescription = "Set entry filters for a topic")
+    private class SetEntryFiltersPerTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+
+        @Parameter(names = { "--entry-filters-name", "-efn" },
+                description = "The class name for the entry filter.", required 
= true)
+        private String  entryFiltersName = "";
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set 
this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            
getTopicPolicies(isGlobal).setEntryFiltersPerTopic(persistentTopic, new 
EntryFilters(entryFiltersName));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove entry filters for a topic")
+    private class RemoveEntryFiltersPerTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to 
remove this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            
getTopicPolicies(isGlobal).removeEntryFiltersPerTopic(persistentTopic);
+        }
     }
 
     @Parameters(commandDescription = "Get max consumers per subscription for a 
topic")
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 66c21a11716..a6891d27401 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -59,6 +59,7 @@ public class HierarchyTopicPolicies {
     final PolicyHierarchyValue<DispatchRateImpl> dispatchRate;
 
     final PolicyHierarchyValue<Boolean> schemaValidationEnforced;
+    final PolicyHierarchyValue<EntryFilters> entryFilters;
 
     public HierarchyTopicPolicies() {
         replicationClusters = new PolicyHierarchyValue<>();
@@ -89,5 +90,6 @@ public class HierarchyTopicPolicies {
         schemaCompatibilityStrategy = new PolicyHierarchyValue<>();
         dispatchRate = new PolicyHierarchyValue<>();
         schemaValidationEnforced = new PolicyHierarchyValue<>();
+        entryFilters = new PolicyHierarchyValue<>();
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index 2b115023f9d..6de91ddd38c 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -50,5 +50,6 @@ public enum PolicyName {
     ENCRYPTION,
     TTL,
     MAX_TOPICS,
-    RESOURCEGROUP
+    RESOURCEGROUP,
+    ENTRY_FILTERS
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index 07e4dff56bd..c68009816e9 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -71,6 +71,7 @@ public class TopicPolicies {
     private Integer maxSubscriptionsPerTopic;
     private DispatchRateImpl replicatorDispatchRate;
     private SchemaCompatibilityStrategy schemaCompatibilityStrategy;
+    private EntryFilters entryFilters;
 
     /**
      * Subscription level policies for specific subscription.
diff --git a/site2/docs/admin-api-namespaces.md 
b/site2/docs/admin-api-namespaces.md
index 7b4f2b60fda..c4ba54a1b4d 100644
--- a/site2/docs/admin-api-namespaces.md
+++ b/site2/docs/admin-api-namespaces.md
@@ -1270,3 +1270,112 @@ admin.namespaces().unload(namespace)
 
 </Tabs>
 ````
+### Configure entry filters policy
+
+#### Set entry filters policy
+
+Entry filter helps with filtering message on server side.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="pulsar-admin"
+  values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST 
API","value":"REST API"},{"label":"Java","value":"Java"}]}>
+<TabItem value="pulsar-admin">
+
+```
+
+$ pulsar-admin namespaces set-entry-filters --desc "The description of the 
entry filter to be used for user help."  
+--entry-filters-name "The class name for the entry filter." 
--entry-filters-dir "The directory for all the entry filter implementations." 
test-tenant/ns1
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+{@inject: 
endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/entryFilters|operation/setEntryFilters?
+version=@pulsar:version_number@}
+
+</TabItem>
+<TabItem value="Java">
+
+```java
+
+admin.namespaces().setEntryFilters(namespace, new EntryFilters("desc", 
"classes name", "class files localtion"))
+
+```
+
+</TabItem>
+
+</Tabs>
+````
+
+#### Get entry filters policies
+
+You can get a configured entry filter for a given namespace.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="pulsar-admin"
+  values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST 
API","value":"REST API"},{"label":"Java","value":"Java"}]}>
+<TabItem value="pulsar-admin">
+
+```
+
+$ pulsar-admin namespaces get-entry-filters test-tenant/ns1
+
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+{@inject: 
endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/entryFilters|operation/getEntryFilters?
+version=@pulsar:version_number@}
+
+</TabItem>
+<TabItem value="Java">
+
+```java
+
+admin.namespaces().getEntryFilters(namespace);
+
+```
+
+</TabItem>
+
+</Tabs>
+````
+
+#### Remove entry filters policies
+
+You can remove entry filters policies for a given namespace.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="pulsar-admin"
+  values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST 
API","value":"REST API"},{"label":"Java","value":"Java"}]}>
+<TabItem value="pulsar-admin">
+
+```
+
+$ pulsar-admin namespaces remove-entry-filters test-tenant/ns1
+
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+{@inject: 
endpoint|DELETE|/admin/v2/namespaces/:tenant/:namespace/entryFilters|operation/removeEntryFilters?
+version=@pulsar:version_number@}
+
+</TabItem>
+<TabItem value="Java">
+
+```java
+
+admin.namespaces().removeEntryFilters(namespace)
+
+```
+
+</TabItem>
+
+</Tabs>
+````

Reply via email to