This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 2f4af65c1de [feature][broker] Allow to configure the entry filters per namespace and per topic (#17153) 2f4af65c1de is described below commit 2f4af65c1de662211645552036ff06d5689fc716 Author: gaozhangmin <zhangmin...@apache.org> AuthorDate: Tue Aug 30 20:41:42 2022 +0800 [feature][broker] Allow to configure the entry filters per namespace and per topic (#17153) --- conf/broker.conf | 3 + .../apache/pulsar/broker/ServiceConfiguration.java | 7 ++ .../pulsar/broker/admin/impl/NamespacesBase.java | 10 ++ .../broker/admin/impl/PersistentTopicsBase.java | 47 +++++++++ .../apache/pulsar/broker/admin/v2/Namespaces.java | 62 ++++++++++++ .../broker/admin/v2/NonPersistentTopics.java | 92 +++++++++++++++++ .../pulsar/broker/admin/v2/PersistentTopics.java | 91 +++++++++++++++++ .../pulsar/broker/service/AbstractTopic.java | 16 +++ .../pulsar/broker/service/BrokerService.java | 31 ++++++ .../pulsar/broker/service/EntryFilterSupport.java | 19 +++- .../org/apache/pulsar/broker/service/Topic.java | 7 ++ .../service/nonpersistent/NonPersistentTopic.java | 10 ++ .../broker/service/persistent/PersistentTopic.java | 11 +++ .../broker/service/plugin/EntryFilterProvider.java | 23 +++++ .../apache/pulsar/broker/admin/AdminApi2Test.java | 34 +++++++ .../broker/service/AbstractBaseDispatcherTest.java | 5 +- .../broker/service/plugin/FilterEntryTest.java | 85 +++++++++++++++- .../pulsar/broker/stats/ConsumerStatsTest.java | 10 +- .../org/apache/pulsar/client/admin/Namespaces.java | 48 ++++++++- .../apache/pulsar/client/admin/TopicPolicies.java | 51 ++++++++++ .../pulsar/common/policies/data/EntryFilters.java | 47 +++------ .../pulsar/common/policies/data/Policies.java | 8 +- .../client/admin/internal/NamespacesImpl.java | 38 +++++++ .../client/admin/internal/TopicPoliciesImpl.java | 39 ++++++++ .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 45 +++++++++ .../apache/pulsar/admin/cli/CmdTopicPolicies.java | 61 ++++++++++++ .../policies/data/HierarchyTopicPolicies.java | 2 + .../pulsar/common/policies/data/PolicyName.java | 3 +- .../pulsar/common/policies/data/TopicPolicies.java | 1 + site2/docs/admin-api-namespaces.md | 109 +++++++++++++++++++++ 30 files changed, 965 insertions(+), 50 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 957b594338d..6f51c96a008 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -453,6 +453,9 @@ entryFilterNames= # The directory for all the entry filter implementations entryFiltersDirectory= +# Whether allow topic level entry filters policies overrides broker configuration. +allowOverrideEntryFilters=false + # Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic maxConcurrentLookupRequest=50000 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index df9ac8c7762..fffa57ff4ab 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1103,6 +1103,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private String entryFiltersDirectory = ""; + @FieldContext( + category = CATEGORY_SERVER, + dynamic = true, + doc = "Whether allow topic level entry filters policies overrides broker configuration." + ) + private boolean allowOverrideEntryFilters = false; + @FieldContext( category = CATEGORY_SERVER, doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 08c968a2d5d..0b25599c126 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -83,6 +83,7 @@ import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.NamespaceOperation; @@ -2722,5 +2723,14 @@ public abstract class NamespacesBase extends AdminResource { } + protected CompletableFuture<Void> internalSetEntryFiltersPerTopicAsync(EntryFilters entryFilters) { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + policies.entryFilters = entryFilters; + return policies; + })); + } + private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 3db8f3a302f..6ef6f662029 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -110,6 +110,7 @@ import org.apache.pulsar.common.policies.data.AuthPolicies; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; @@ -5335,4 +5336,50 @@ public class PersistentTopicsBase extends AdminResource { return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); }); } + + protected CompletableFuture<EntryFilters> internalGetEntryFilters(boolean applied, boolean isGlobal) { + return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.READ) + .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenApply(op -> op.map(TopicPolicies::getEntryFilters) + .orElseGet(() -> { + if (applied) { + EntryFilters entryFilters = getNamespacePolicies(namespaceName).entryFilters; + if (entryFilters == null) { + return new EntryFilters(String.join(",", + pulsar().getConfiguration().getEntryFilterNames())); + } + return entryFilters; + } + return null; + }))); + + } + + protected CompletableFuture<Void> internalSetEntryFilters(EntryFilters entryFilters, + boolean isGlobal) { + + return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE) + .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenCompose(op -> { + TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); + topicPolicies.setEntryFilters(entryFilters); + topicPolicies.setIsGlobal(isGlobal); + return pulsar().getTopicPoliciesService() + .updateTopicPoliciesAsync(topicName, topicPolicies); + })); + } + + protected CompletableFuture<Void> internalRemoveEntryFilters(boolean isGlobal) { + return validateTopicPolicyOperationAsync(topicName, PolicyName.ENTRY_FILTERS, PolicyOperation.WRITE) + .thenCompose(__ -> + getTopicPoliciesAsyncWithRetry(topicName, isGlobal) + .thenCompose(op -> { + if (!op.isPresent()) { + return CompletableFuture.completedFuture(null); + } + op.get().setEntryFilters(null); + op.get().setIsGlobal(isGlobal); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get()); + })); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index e43e01ced93..7fae0a5709e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -60,6 +60,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; @@ -2658,5 +2659,66 @@ public class Namespaces extends NamespacesBase { } } + @GET + @Path("/{tenant}/{namespace}/entryFilters") + @ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace does not exist") }) + public void getEntryFiltersPerTopic( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + validateNamespacePolicyOperationAsync(namespaceName, PolicyName.ENTRY_FILTERS, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenAccept(polices -> asyncResponse.resume(polices.entryFilters)) + .exceptionally(ex -> { + log.error("[{}] Failed to get entry filters config on namespace {}: {} ", + clientAppId(), namespaceName, ex.getCause().getMessage(), ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @POST + @Path("/{tenant}/{namespace}/entryFilters") + @ApiOperation(value = "Set entry filters for namespace") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")}) + public void setEntryFiltersPerTopic(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @ApiParam(value = "entry filters", required = true) + EntryFilters entryFilters) { + validateNamespaceName(tenant, namespace); + internalSetEntryFiltersPerTopicAsync(entryFilters) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("Failed to set entry filters for namespace {}", namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/entryFilters") + @ApiOperation(value = "Remove entry filters for namespace") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), + @ApiResponse(code = 412, message = "Invalid TTL")}) + public void removeNamespaceEntryFilters(@Suspended AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { + validateNamespaceName(tenant, namespace); + internalSetEntryFiltersPerTopicAsync(null) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("Failed to remove entry filters for namespace {}", namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + + private static final Logger log = LoggerFactory.getLogger(Namespaces.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index fa8bcde26ba..82da5f91263 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -36,6 +36,7 @@ import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -53,6 +54,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicOperation; @@ -509,6 +511,96 @@ public class NonPersistentTopics extends PersistentTopics { validateTopicOwnership(topicName, authoritative); } + @GET + @Path("/{tenant}/{namespace}/{topic}/entryFilters") + @ApiOperation(value = "Get entry filters for a topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenants or Namespace doesn't exist") }) + public void getEntryFilters(@Suspended AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") @DefaultValue("false") boolean applied, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @ApiParam(value = "Whether leader broker redirected this call to this " + + "broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetEntryFilters(applied, isGlobal)) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getEntryFilters", ex, asyncResponse); + return null; + }); + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/entryFilters") + @ApiOperation(value = "Set entry filters for specified topic") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, + message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void setEntryFilters(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @ApiParam(value = "Whether leader broker redirected this " + + "call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Enable sub types for the specified topic") + EntryFilters entryFilters) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetEntryFilters(entryFilters, isGlobal)) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setEntryFilters", ex, asyncResponse); + return null; + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/entryFilters") + @ApiOperation(value = "Remove entry filters for specified topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, + message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void removeEntryFilters(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @ApiParam(value = "Whether leader broker redirected this" + + "call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalRemoveEntryFilters(isGlobal)) + .thenRun(() -> { + log.info( + "[{}] Successfully remove entry filters: tenant={}, namespace={}, topic={}, isGlobal={}", + clientAppId(), + tenant, + namespace, + topicName.getLocalName(), + isGlobal); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removeEntryFilters", ex, asyncResponse); + return null; + }); + } + private Topic getTopicReference(TopicName topicName) { try { return pulsar().getBrokerService().getTopicIfExists(topicName.toString()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 56131a1d5fc..976368f10fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -58,6 +58,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.PersistencePolicies; @@ -4272,5 +4273,95 @@ public class PersistentTopics extends PersistentTopicsBase { }); } + @GET + @Path("/{tenant}/{namespace}/{topic}/entryFilters") + @ApiOperation(value = "Get entry filters for a topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenants or Namespace doesn't exist") }) + public void getEntryFilters(@Suspended AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") @DefaultValue("false") boolean applied, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @ApiParam(value = "Whether leader broker redirected this call to this " + + "broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetEntryFilters(applied, isGlobal)) + .thenAccept(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getEntryFilters", ex, asyncResponse); + return null; + }); + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/entryFilters") + @ApiOperation(value = "Set entry filters for specified topic") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, + message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void setEntryFilters(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @ApiParam(value = "Whether leader broker redirected this" + + "call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Entry filters for the specified topic") + EntryFilters entryFilters) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetEntryFilters(entryFilters, isGlobal)) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setEntryFilters", ex, asyncResponse); + return null; + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/entryFilters") + @ApiOperation(value = "Remove entry filters for specified topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, + message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void removeEntryFilters(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal, + @ApiParam(value = "Whether leader broker redirected this" + + "call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalRemoveEntryFilters(isGlobal)) + .thenRun(() -> { + log.info( + "[{}] Successfully remove entry filters: tenant={}, namespace={}, topic={}, isGlobal={}", + clientAppId(), + tenant, + namespace, + topicName.getLocalName(), + isGlobal); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removeEntryFilters", ex, asyncResponse); + return null; + }); + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 0010c9bbd8a..a3df1498698 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC; import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Arrays; @@ -52,6 +53,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyExcep import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; +import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; @@ -60,6 +62,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.Policies; @@ -144,6 +147,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder(); protected final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder(); + protected ImmutableMap<String, EntryFilterWithClassLoader> entryFilters; public AbstractTopic(String topic, BrokerService brokerService) { this.topic = topic; @@ -179,6 +183,14 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP return this.topicPolicies.getDispatchRate().get(); } + public EntryFilters getEntryFiltersPolicy() { + return this.topicPolicies.getEntryFilters().get(); + } + + public ImmutableMap<String, EntryFilterWithClassLoader> getEntryFilters() { + return this.entryFilters; + } + public DispatchRateImpl getReplicatorDispatchRate() { return this.topicPolicies.getReplicatorDispatchRate().get(); } @@ -225,6 +237,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold()); topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate())); topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced()); + topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters()); this.subscriptionPolicies = data.getSubscriptionPolicies(); } @@ -273,6 +286,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies); updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName()); topicPolicies.getSchemaValidationEnforced().updateNamespaceValue(namespacePolicies.schema_validation_enforced); + topicPolicies.getEntryFilters().updateNamespaceValue(namespacePolicies.entryFilters); } private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) { @@ -367,6 +381,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP .updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy)); topicPolicies.getDispatchRate().updateBrokerValue(dispatchRateInBroker(config)); topicPolicies.getSchemaValidationEnforced().updateBrokerValue(config.isSchemaValidationEnforced()); + topicPolicies.getEntryFilters().updateBrokerValue(new EntryFilters(String.join(",", + config.getEntryFilterNames()))); } private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index f19be58f958..8985cb9ec07 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -146,6 +146,7 @@ import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.PersistencePolicies; @@ -1139,6 +1140,20 @@ public class BrokerService implements Closeable { CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic); isOwner.thenRun(() -> { nonPersistentTopic.initialize() + .thenAccept(__ -> { + EntryFilters entryFiltersPolicy = nonPersistentTopic.getEntryFiltersPolicy(); + if (!entryFiltersPolicy.getEntryFilterNames().isEmpty()) { + try { + nonPersistentTopic.entryFilters = + EntryFilterProvider.createEntryFilters(pulsar.getConfig(), + entryFiltersPolicy); + } catch (IOException e) { + log.warn("Failed to set entry filters on topic {}-{}", topic, e.getMessage()); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(e); + } + } + }) .thenCompose(__ -> nonPersistentTopic.checkReplication()) .thenRun(() -> { log.info("Created topic {}", nonPersistentTopic); @@ -1431,6 +1446,22 @@ public class BrokerService implements Closeable { : newTopic(topic, ledger, BrokerService.this, PersistentTopic.class); persistentTopic .initialize() + .thenAccept(__ -> { + EntryFilters entryFiltersPolicy = persistentTopic.getEntryFiltersPolicy(); + if (!entryFiltersPolicy.getEntryFilterNames().isEmpty()) { + try { + persistentTopic.entryFilters = + EntryFilterProvider.createEntryFilters(pulsar.getConfig(), + entryFiltersPolicy); + } catch (IOException e) { + log.warn("Failed to set entry filters on topic {}-{}", topic, + e.getMessage()); + pulsar.getExecutor().execute(() -> + topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(e); + } + } + }) .thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded()) .thenCompose(__ -> persistentTopic.checkReplication()) .thenCompose(v -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java index b0c7385a28f..7a4700a90a6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryFilterSupport.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.bookkeeper.mledger.Entry; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; @@ -39,9 +40,21 @@ public class EntryFilterSupport { public EntryFilterSupport(Subscription subscription) { this.subscription = subscription; - if (subscription != null && subscription.getTopic() != null && MapUtils.isNotEmpty(subscription.getTopic() - .getBrokerService().getEntryFilters())) { - this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().asList(); + if (subscription != null && subscription.getTopic() != null) { + if (MapUtils.isNotEmpty(subscription.getTopic() + .getBrokerService().getEntryFilters()) + && !subscription.getTopic().getBrokerService().pulsar() + .getConfiguration().isAllowOverrideEntryFilters()) { + this.entryFilters = subscription.getTopic().getBrokerService().getEntryFilters().values().asList(); + } else { + ImmutableMap<String, EntryFilterWithClassLoader> entryFiltersMap = + subscription.getTopic().getEntryFilters(); + if (entryFiltersMap != null) { + this.entryFilters = subscription.getTopic().getEntryFilters().values().asList(); + } else { + this.entryFilters = ImmutableList.of(); + } + } this.filterContext = new FilterContext(); } else { this.entryFilters = ImmutableList.of(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 038f209fc1f..b4f27adcc4a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; import java.util.Map; import java.util.Optional; @@ -26,6 +27,7 @@ import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter; +import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.client.api.MessageId; @@ -35,6 +37,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; @@ -231,6 +234,10 @@ public interface Topic { boolean isReplicated(); + EntryFilters getEntryFiltersPolicy(); + + ImmutableMap<String, EntryFilterWithClassLoader> getEntryFilters(); + BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType); void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index db77d12e713..0b5a1ed9731 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -497,6 +497,16 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName()); } + if (entryFilters != null) { + entryFilters.forEach((name, filter) -> { + try { + filter.close(); + } catch (Exception e) { + log.warn("Error shutting down entry filter {}", name, e); + } + }); + } + CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null) : FutureUtil.waitForAll(futures); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4777546bc84..c8cd487c5ee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1280,6 +1280,17 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName()); } + //close entry filters + if (entryFilters != null) { + entryFilters.forEach((name, filter) -> { + try { + filter.close(); + } catch (Exception e) { + log.warn("Error shutting down entry filter {}", name, e); + } + }); + } + CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null) : FutureUtil.waitForAll(futures); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java index 48c8d9a7683..048b053464e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/plugin/EntryFilterProvider.java @@ -33,6 +33,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.nar.NarClassLoaderBuilder; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.util.ObjectMapperFactory; @Slf4j @@ -44,6 +45,28 @@ public class EntryFilterProvider { /** * create entry filter instance. */ + public static ImmutableMap<String, EntryFilterWithClassLoader> createEntryFilters(ServiceConfiguration conf, + EntryFilters entryFilters) + throws IOException { + EntryFilterDefinitions definitions = searchForEntryFilters(conf.getEntryFiltersDirectory(), + conf.getNarExtractionDirectory()); + ImmutableMap.Builder<String, EntryFilterWithClassLoader> builder = ImmutableMap.builder(); + for (String filterName : entryFilters.getEntryFilterNames().split(",")) { + EntryFilterMetaData metaData = definitions.getFilters().get(filterName); + if (null == metaData) { + throw new RuntimeException("No entry filter is found for name `" + filterName + + "`. Available entry filters are : " + definitions.getFilters()); + } + EntryFilterWithClassLoader filter; + filter = load(metaData, conf.getNarExtractionDirectory()); + if (filter != null) { + builder.put(filterName, filter); + } + log.info("Successfully loaded entry filter for name `{}` from topic policy", filterName); + } + return builder.build(); + } + public static ImmutableMap<String, EntryFilterWithClassLoader> createEntryFilters( ServiceConfiguration conf) throws IOException { EntryFilterDefinitions definitions = searchForEntryFilters(conf.getEntryFiltersDirectory(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 33527e18da9..99ad1b99abd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -98,6 +98,7 @@ import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; @@ -2132,6 +2133,39 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { } } + @Test(timeOut = 30000) + public void testSetNamespaceEntryFilters() throws Exception { + EntryFilters entryFilters = new EntryFilters( + "org.apache.pulsar.broker.service.plugin.EntryFilterTest"); + + final String myNamespace = "prop-xyz/ns" + UUID.randomUUID(); + admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); + + assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace)); + + admin.namespaces().setNamespaceEntryFilters(myNamespace, entryFilters); + assertEquals(admin.namespaces().getNamespaceEntryFilters(myNamespace), entryFilters); + admin.namespaces().removeNamespaceEntryFilters(myNamespace); + assertNull(admin.namespaces().getNamespaceEntryFilters(myNamespace)); + } + + @Test(dataProvider = "topicType") + public void testSetTopicLevelEntryFilters(String topicType) throws Exception { + EntryFilters entryFilters = new EntryFilters("org.apache.pulsar.broker.service.plugin.EntryFilterTest"); + final String topic = topicType + "://prop-xyz/ns1/test-schema-validation-enforced"; + admin.topics().createPartitionedTopic(topic, 1); + @Cleanup + Producer<byte[]> producer1 = pulsarClient.newProducer() + .topic(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + 0) + .create(); + assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false)); + admin.topicPolicies().setEntryFiltersPerTopic(topic, entryFilters); + Awaitility.await().untilAsserted(() -> assertEquals(admin.topicPolicies().getEntryFiltersPerTopic(topic, + false), entryFilters)); + admin.topicPolicies().removeEntryFiltersPerTopic(topic); + assertNull(admin.topicPolicies().getEntryFiltersPerTopic(topic, false)); + } + @Test(timeOut = 30000) public void testMaxSubPerTopic() throws Exception { pulsar.getConfiguration().setMaxSubscriptionsPerTopic(0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java index d89738869a8..cc1558fed27 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java @@ -78,18 +78,17 @@ public class AbstractBaseDispatcherTest { @Test - public void testFilterEntriesForConsumerOfEntryFilter() { + public void testFilterEntriesForConsumerOfEntryFilter() throws Exception { Topic mockTopic = mock(Topic.class); when(this.subscriptionMock.getTopic()).thenReturn(mockTopic); BrokerService mockBrokerService = mock(BrokerService.class); when(mockTopic.getBrokerService()).thenReturn(mockBrokerService); - EntryFilterWithClassLoader mockFilter = mock(EntryFilterWithClassLoader.class); when(mockFilter.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn( EntryFilter.FilterResult.REJECT); ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = ImmutableMap.of("key", mockFilter); - when(mockBrokerService.getEntryFilters()).thenReturn(entryFilters); + when(mockTopic.getEntryFilters()).thenReturn(entryFilters); this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index ee100f3fdee..a5f8b5ab38f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java @@ -19,9 +19,12 @@ package org.apache.pulsar.broker.service.plugin; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; +import static org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertNotNull; import com.google.common.collect.ImmutableList; @@ -35,13 +38,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.AbstractTopic; -import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.EntryFilterSupport; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -71,6 +75,77 @@ public class FilterEntryTest extends BrokerTestBase { internalCleanup(); } + @Test + public void testOverride() throws Exception { + conf.setAllowOverrideEntryFilters(true); + String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID(); + String subName = "sub"; + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(false).topic(topic).create(); + for (int i = 0; i < 10; i++) { + producer.send("test"); + } + + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService() + .getTopicReference(topic).get(); + + // set topic level entry filters + EntryFilterWithClassLoader mockFilter = mock(EntryFilterWithClassLoader.class); + when(mockFilter.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn( + EntryFilter.FilterResult.REJECT); + ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = ImmutableMap.of("key", mockFilter); + + Field field = topicRef.getClass().getSuperclass().getDeclaredField("entryFilters"); + field.setAccessible(true); + field.set(topicRef, entryFilters); + + EntryFilterWithClassLoader mockFilter1 = mock(EntryFilterWithClassLoader.class); + when(mockFilter1.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn( + EntryFilter.FilterResult.ACCEPT); + ImmutableMap<String, EntryFilterWithClassLoader> entryFilters1 = ImmutableMap.of("key2", mockFilter1); + Field field2 = pulsar.getBrokerService().getClass().getDeclaredField("entryFilters"); + field2.setAccessible(true); + field2.set(pulsar.getBrokerService(), entryFilters1); + + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionInitialPosition(Earliest) + .subscriptionName(subName).subscribe(); + + int counter = 0; + while (true) { + Message<String> message = consumer.receive(1, TimeUnit.SECONDS); + if (message != null) { + counter++; + consumer.acknowledge(message); + } else { + break; + } + } + // All normal messages can be received + assertEquals(0, counter); + + + conf.setAllowOverrideEntryFilters(false); + consumer.close(); + consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionInitialPosition(Earliest) + .subscriptionName(subName + "1").subscribe(); + int counter1 = 0; + while (true) { + Message<String> message = consumer.receive(1, TimeUnit.SECONDS); + if (message != null) { + counter1++; + consumer.acknowledge(message); + } else { + break; + } + } + // All normal messages can be received + assertEquals(10, counter1); + conf.setAllowOverrideEntryFilters(false); + consumer.close(); + } + @Test public void testFilter() throws Exception { Map<String, String> map = new HashMap<>(); @@ -184,10 +259,12 @@ public class FilterEntryTest extends BrokerTestBase { producer.close(); consumer.close(); - BrokerService brokerService = pulsar.getBrokerService(); - Field field1 = BrokerService.class.getDeclaredField("entryFilters"); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService() + .getTopicReference(topic).get(); + Field field1 = topicRef.getClass().getSuperclass().getDeclaredField("entryFilters"); field1.setAccessible(true); - field1.set(brokerService, ImmutableMap.of("1", loader1, "2", loader2)); + field1.set(topicRef, ImmutableMap.of("1", loader1, "2", loader2)); + cleanup(); verify(loader1, times(1)).close(); verify(loader2, times(1)).close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index e845e6e71fd..801a55ad419 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.stats; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertNotEquals; @@ -30,6 +29,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; @@ -410,8 +410,12 @@ public class ConsumerStatsTest extends ProducerConsumerBase { loader = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter, narClassLoader); ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = ImmutableMap.of("filter", loader); - BrokerService brokerService = pulsar.getBrokerService(); - doReturn(entryFilters).when(brokerService).getEntryFilters(); + + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService() + .getTopicReference(topic).get(); + Field field1 = topicRef.getClass().getSuperclass().getDeclaredField("entryFilters"); + field1.setAccessible(true); + field1.set(topicRef, entryFilters); Map<String, String> metadataConsumer = new HashMap<>(); metadataConsumer.put("matchValueAccept", "producer1"); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 39cf49cf961..4fc50bf5b2e 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -36,6 +36,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; @@ -4167,7 +4168,6 @@ public interface Namespaces { CompletableFuture<Void> removeMaxTopicsPerNamespaceAsync(String namespace); /** -<<<<<<< HEAD * Set key value pair property for a namespace. * If the property absents, a new property will added. Otherwise, the new value will overwrite. * @@ -4467,4 +4467,50 @@ public interface Namespaces { * @return */ CompletableFuture<Void> removeNamespaceResourceGroupAsync(String namespace); + + /** + * Get entry filters for a namespace. + * @param namespace + * @return entry filters classes info. + * @throws PulsarAdminException + */ + EntryFilters getNamespaceEntryFilters(String namespace) throws PulsarAdminException; + + /** + * Get entry filters for a namespace asynchronously. + * + * @param namespace + * @return entry filters classes info. + */ + CompletableFuture<EntryFilters> getNamespaceEntryFiltersAsync(String namespace); + + /** + * Set entry filters on a namespace. + * + * @param namespace Namespace name + * @param entryFilters The entry filters + */ + void setNamespaceEntryFilters(String namespace, EntryFilters entryFilters) throws PulsarAdminException; + + /** + * Set entry filters on a namespace asynchronously. + * + * @param namespace Namespace name + * @param entryFilters The entry filters + */ + CompletableFuture<Void> setNamespaceEntryFiltersAsync(String namespace, EntryFilters entryFilters); + + /** + * remove entry filters of a namespace. + * @param namespace Namespace name + * @throws PulsarAdminException + */ + void removeNamespaceEntryFilters(String namespace) throws PulsarAdminException; + + /** + * remove entry filters of a namespace asynchronously. + * @param namespace Namespace name + * @return + */ + CompletableFuture<Void> removeNamespaceEntryFiltersAsync(String namespace); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java index dabe69301cf..827bb99132b 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/TopicPolicies.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.policies.data.PersistencePolicies; @@ -1763,4 +1764,54 @@ public interface TopicPolicies { * @param topic The topic in whose policy should be removed */ CompletableFuture<Void> removeSchemaCompatibilityStrategyAsync(String topic); + + /** + * Get applied entry filters for a topic. + * @param topic + * @param applied + * @return entry filters classes info. + * @throws PulsarAdminException + */ + EntryFilters getEntryFiltersPerTopic(String topic, boolean applied) throws PulsarAdminException; + + /** + * Get applied entry filters for a topic asynchronously. + * + * @param topic + * @param applied + * @return + */ + CompletableFuture<EntryFilters> getEntryFiltersPerTopicAsync(String topic, boolean applied); + + /** + * Set entry filters on a topic. + * + * @param topic The topic in whose policy should be set + * @param entryFilters The entry filters + */ + void setEntryFiltersPerTopic(String topic, EntryFilters entryFilters) throws PulsarAdminException; + + /** + * Set entry filters on a topic asynchronously. + * + * @param topic The topic in whose policy should be set + * @param entryFilters The entry filters + */ + CompletableFuture<Void> setEntryFiltersPerTopicAsync(String topic, EntryFilters entryFilters); + + /** + * remove entry filters of a topic. + * @param topic + * @throws PulsarAdminException + */ + void removeEntryFiltersPerTopic(String topic) throws PulsarAdminException; + + /** + * remove entry filters of a topic asynchronously. + * @param topic + * @return + */ + CompletableFuture<Void> removeEntryFiltersPerTopicAsync(String topic); + + } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java similarity index 58% copy from pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java copy to pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java index 2b115023f9d..785fbce39e1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/EntryFilters.java @@ -16,39 +16,20 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.pulsar.common.policies.data; -/** - * PolicyName authorization operations. - */ -public enum PolicyName { - ALL, - ANTI_AFFINITY, - AUTO_SUBSCRIPTION_CREATION, - AUTO_TOPIC_CREATION, - BACKLOG, - COMPACTION, - DELAYED_DELIVERY, - INACTIVE_TOPIC, - DEDUPLICATION, - MAX_CONSUMERS, - MAX_PRODUCERS, - DEDUPLICATION_SNAPSHOT, - MAX_UNACKED, - MAX_SUBSCRIPTIONS, - OFFLOAD, - PARTITION, - PERSISTENCE, - RATE, - RETENTION, - REPLICATION, - REPLICATION_RATE, - SCHEMA_COMPATIBILITY_STRATEGY, - SUBSCRIPTION_AUTH_MODE, - SUBSCRIPTION_EXPIRATION_TIME, - ENCRYPTION, - TTL, - MAX_TOPICS, - RESOURCEGROUP +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class EntryFilters { + + /** + * The class name for the entry filter. + */ + private String entryFilterNames; + } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index ebcd7fc09c1..e2b186a5a16 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -128,6 +128,9 @@ public class Policies { LARGEST, HOT; } + @SuppressWarnings("checkstyle:MemberName") + public EntryFilters entryFilters = null; + @Override public int hashCode() { return Objects.hash(auth_policies, replication_clusters, @@ -151,7 +154,7 @@ public class Policies { offload_policies, subscription_types_enabled, properties, - resource_group_name); + resource_group_name, entryFilters); } @Override @@ -196,7 +199,8 @@ public class Policies { && Objects.equals(offload_policies, other.offload_policies) && Objects.equals(subscription_types_enabled, other.subscription_types_enabled) && Objects.equals(properties, other.properties) - && Objects.equals(resource_group_name, other.resource_group_name); + && Objects.equals(resource_group_name, other.resource_group_name) + && Objects.equals(entryFilters, other.entryFilters); } return false; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index bfb060596b1..d62312ee7d7 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -44,6 +44,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.OffloadPolicies; @@ -1875,4 +1876,41 @@ public class NamespacesImpl extends BaseResource implements Namespaces { WebTarget path = namespacePath(ns, parts); return asyncGetRequest(path, callback); } + + @Override + public EntryFilters getNamespaceEntryFilters(String namespace) throws PulsarAdminException { + return sync(() -> getNamespaceEntryFiltersAsync(namespace)); + } + + @Override + public CompletableFuture<EntryFilters> getNamespaceEntryFiltersAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "entryFilters"); + return asyncGetRequest(path, new FutureCallback<EntryFilters>(){}); + } + + @Override + public void setNamespaceEntryFilters(String namespace, EntryFilters entryFilters) + throws PulsarAdminException { + sync(() -> setNamespaceEntryFiltersAsync(namespace, entryFilters)); + } + + @Override + public CompletableFuture<Void> setNamespaceEntryFiltersAsync(String namespace, EntryFilters entryFilters) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "entryFilters"); + return asyncPostRequest(path, Entity.entity(entryFilters, MediaType.APPLICATION_JSON)); + } + + @Override + public void removeNamespaceEntryFilters(String namespace) throws PulsarAdminException { + sync(() -> removeNamespaceEntryFiltersAsync(namespace)); + } + + @Override + public CompletableFuture<Void> removeNamespaceEntryFiltersAsync(String namespace) { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "entryFilters"); + return asyncDeleteRequest(path); + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java index 29226567216..821a4e13e18 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicPoliciesImpl.java @@ -34,6 +34,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.OffloadPolicies; @@ -1178,6 +1179,44 @@ public class TopicPoliciesImpl extends BaseResource implements TopicPolicies { return asyncDeleteRequest(path); } + @Override + public EntryFilters getEntryFiltersPerTopic(String topic, boolean applied) throws PulsarAdminException { + return sync(() -> getEntryFiltersPerTopicAsync(topic, applied)); + } + + @Override + public CompletableFuture<EntryFilters> getEntryFiltersPerTopicAsync(String topic, boolean applied) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "entryFilters"); + path = path.queryParam("applied", applied); + return asyncGetRequest(path, new FutureCallback<EntryFilters>(){}); + } + + @Override + public void setEntryFiltersPerTopic(String topic, EntryFilters entryFilters) + throws PulsarAdminException { + sync(() -> setEntryFiltersPerTopicAsync(topic, entryFilters)); + } + + @Override + public CompletableFuture<Void> setEntryFiltersPerTopicAsync(String topic, EntryFilters entryFilters) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "entryFilters"); + return asyncPostRequest(path, Entity.entity(entryFilters, MediaType.APPLICATION_JSON)); + } + + @Override + public void removeEntryFiltersPerTopic(String topic) throws PulsarAdminException { + sync(() -> removeEntryFiltersPerTopicAsync(topic)); + } + + @Override + public CompletableFuture<Void> removeEntryFiltersPerTopicAsync(String topic) { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "entryFilters"); + return asyncDeleteRequest(path); + } + /* * returns topic name with encoded Local Name */ diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 8b4679e43c5..c89bb152fc1 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -50,6 +50,7 @@ import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.OffloadPolicies; @@ -2588,6 +2589,46 @@ public class CmdNamespaces extends CmdBase { } } + @Parameters(commandDescription = "Get entry filters for a namespace") + private class GetEntryFiltersPerTopic extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List<String> params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + print(getAdmin().namespaces().getNamespaceEntryFilters(namespace)); + } + } + + @Parameters(commandDescription = "Set entry filters for a namespace") + private class SetEntryFiltersPerTopic extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List<String> params; + + @Parameter(names = { "--entry-filters-name", "-efn" }, + description = "The class name for the entry filter.", required = true) + private String entryFiltersName = ""; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + getAdmin().namespaces().setNamespaceEntryFilters(namespace, new EntryFilters(entryFiltersName)); + } + } + + @Parameters(commandDescription = "Remove entry filters for a namespace") + private class RemoveEntryFiltersPerTopic extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List<String> params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + getAdmin().namespaces().removeNamespaceEntryFilters(namespace); + } + } + public CmdNamespaces(Supplier<PulsarAdmin> admin) { super("namespaces", admin); jcommander.addCommand("list", new GetNamespacesPerProperty()); @@ -2768,5 +2809,9 @@ public class CmdNamespaces extends CmdBase { jcommander.addCommand("get-resource-group", new GetResourceGroup()); jcommander.addCommand("set-resource-group", new SetResourceGroup()); jcommander.addCommand("remove-resource-group", new RemoveResourceGroup()); + + jcommander.addCommand("get-entry-filters", new GetEntryFiltersPerTopic()); + jcommander.addCommand("set-entry-filters", new SetEntryFiltersPerTopic()); + jcommander.addCommand("remove-entry-filters", new RemoveEntryFiltersPerTopic()); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java index b3bf27fe54a..99bf69b10f5 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java @@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; @@ -146,6 +147,66 @@ public class CmdTopicPolicies extends CmdBase { jcommander.addCommand("remove-schema-compatibility-strategy", new RemoveSchemaCompatibilityStrategy()); jcommander.addCommand("set-schema-compatibility-strategy", new SetSchemaCompatibilityStrategy()); jcommander.addCommand("get-schema-compatibility-strategy", new GetSchemaCompatibilityStrategy()); + + jcommander.addCommand("get-entry-filters-per-topic", new GetEntryFiltersPerTopic()); + jcommander.addCommand("set-entry-filters-per-topic", new SetEntryFiltersPerTopic()); + jcommander.addCommand("remove-entry-filters-per-topic", new RemoveEntryFiltersPerTopic()); + } + + @Parameters(commandDescription = "Get entry filters for a topic") + private class GetEntryFiltersPerTopic extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic") + private boolean applied = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(getTopicPolicies(isGlobal).getEntryFiltersPerTopic(persistentTopic, applied)); + } + } + + @Parameters(commandDescription = "Set entry filters for a topic") + private class SetEntryFiltersPerTopic extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + + @Parameter(names = { "--entry-filters-name", "-efn" }, + description = "The class name for the entry filter.", required = true) + private String entryFiltersName = ""; + + @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + getTopicPolicies(isGlobal).setEntryFiltersPerTopic(persistentTopic, new EntryFilters(entryFiltersName)); + } + } + + @Parameters(commandDescription = "Remove entry filters for a topic") + private class RemoveEntryFiltersPerTopic extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. " + + "If set to true, broker returned global topic policies") + private boolean isGlobal = false; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + getTopicPolicies(isGlobal).removeEntryFiltersPerTopic(persistentTopic); + } } @Parameters(commandDescription = "Get max consumers per subscription for a topic") diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java index 66c21a11716..a6891d27401 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java @@ -59,6 +59,7 @@ public class HierarchyTopicPolicies { final PolicyHierarchyValue<DispatchRateImpl> dispatchRate; final PolicyHierarchyValue<Boolean> schemaValidationEnforced; + final PolicyHierarchyValue<EntryFilters> entryFilters; public HierarchyTopicPolicies() { replicationClusters = new PolicyHierarchyValue<>(); @@ -89,5 +90,6 @@ public class HierarchyTopicPolicies { schemaCompatibilityStrategy = new PolicyHierarchyValue<>(); dispatchRate = new PolicyHierarchyValue<>(); schemaValidationEnforced = new PolicyHierarchyValue<>(); + entryFilters = new PolicyHierarchyValue<>(); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java index 2b115023f9d..6de91ddd38c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java @@ -50,5 +50,6 @@ public enum PolicyName { ENCRYPTION, TTL, MAX_TOPICS, - RESOURCEGROUP + RESOURCEGROUP, + ENTRY_FILTERS } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 07e4dff56bd..c68009816e9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -71,6 +71,7 @@ public class TopicPolicies { private Integer maxSubscriptionsPerTopic; private DispatchRateImpl replicatorDispatchRate; private SchemaCompatibilityStrategy schemaCompatibilityStrategy; + private EntryFilters entryFilters; /** * Subscription level policies for specific subscription. diff --git a/site2/docs/admin-api-namespaces.md b/site2/docs/admin-api-namespaces.md index 7b4f2b60fda..c4ba54a1b4d 100644 --- a/site2/docs/admin-api-namespaces.md +++ b/site2/docs/admin-api-namespaces.md @@ -1270,3 +1270,112 @@ admin.namespaces().unload(namespace) </Tabs> ```` +### Configure entry filters policy + +#### Set entry filters policy + +Entry filter helps with filtering message on server side. + +````mdx-code-block +<Tabs groupId="api-choice" + defaultValue="pulsar-admin" + values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> +<TabItem value="pulsar-admin"> + +``` + +$ pulsar-admin namespaces set-entry-filters --desc "The description of the entry filter to be used for user help." +--entry-filters-name "The class name for the entry filter." --entry-filters-dir "The directory for all the entry filter implementations." test-tenant/ns1 +``` + +</TabItem> +<TabItem value="REST API"> + +{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/entryFilters|operation/setEntryFilters? +version=@pulsar:version_number@} + +</TabItem> +<TabItem value="Java"> + +```java + +admin.namespaces().setEntryFilters(namespace, new EntryFilters("desc", "classes name", "class files localtion")) + +``` + +</TabItem> + +</Tabs> +```` + +#### Get entry filters policies + +You can get a configured entry filter for a given namespace. + +````mdx-code-block +<Tabs groupId="api-choice" + defaultValue="pulsar-admin" + values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> +<TabItem value="pulsar-admin"> + +``` + +$ pulsar-admin namespaces get-entry-filters test-tenant/ns1 + +``` + +</TabItem> +<TabItem value="REST API"> + +{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/entryFilters|operation/getEntryFilters? +version=@pulsar:version_number@} + +</TabItem> +<TabItem value="Java"> + +```java + +admin.namespaces().getEntryFilters(namespace); + +``` + +</TabItem> + +</Tabs> +```` + +#### Remove entry filters policies + +You can remove entry filters policies for a given namespace. + +````mdx-code-block +<Tabs groupId="api-choice" + defaultValue="pulsar-admin" + values={[{"label":"pulsar-admin","value":"pulsar-admin"},{"label":"REST API","value":"REST API"},{"label":"Java","value":"Java"}]}> +<TabItem value="pulsar-admin"> + +``` + +$ pulsar-admin namespaces remove-entry-filters test-tenant/ns1 + +``` + +</TabItem> +<TabItem value="REST API"> + +{@inject: endpoint|DELETE|/admin/v2/namespaces/:tenant/:namespace/entryFilters|operation/removeEntryFilters? +version=@pulsar:version_number@} + +</TabItem> +<TabItem value="Java"> + +```java + +admin.namespaces().removeEntryFilters(namespace) + +``` + +</TabItem> + +</Tabs> +````