This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 2245c37 Rest API, AdminClient and CLI for compaction threshold (#1924) 2245c37 is described below commit 2245c3785e161d3d421bdd2120e0272878b01c6f Author: Ivan Kelly <iv...@apache.org> AuthorDate: Sat Jun 9 21:41:39 2018 +0200 Rest API, AdminClient and CLI for compaction threshold (#1924) Allow the user to specify a namespace policy for the maximum backlog size for topics in that namespace. When a topic backlog reaches this size, compaction is triggered. --- .../pulsar/broker/admin/impl/NamespacesBase.java | 41 ++++++++++++++++ .../apache/pulsar/broker/admin/v1/Namespaces.java | 30 ++++++++++++ .../apache/pulsar/broker/admin/v2/Namespaces.java | 29 +++++++++++ .../org/apache/pulsar/client/admin/Namespaces.java | 47 ++++++++++++++++++ .../client/admin/internal/NamespacesImpl.java | 22 +++++++++ .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 33 +++++++++++++ .../single-cluster-3-bookie-2-broker.yaml | 2 + .../pulsar/tests/integration/TestCompaction.java | 57 ++++++++++++++++++++++ 8 files changed, 261 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index e11864d..c845fd5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1529,5 +1529,46 @@ public abstract class NamespacesBase extends AdminResource { } } + protected long internalGetCompactionThreshold() { + validateAdminAccessForTenant(namespaceName.getTenant()); + return getNamespacePolicies(namespaceName).compaction_threshold; + } + + protected void internalSetCompactionThreshold(long newThreshold) { + validateSuperUserAccess(); + validatePoliciesReadOnlyAccess(); + + try { + Stat nodeStat = new Stat(); + final String path = path(POLICIES, namespaceName.toString()); + byte[] content = globalZk().getData(path, null, nodeStat); + Policies policies = jsonMapper().readValue(content, Policies.class); + if (newThreshold < 0) { + throw new RestException(Status.PRECONDITION_FAILED, + "compactionThreshold must be 0 or more"); + } + policies.compaction_threshold = newThreshold; + globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion()); + policiesCache().invalidate(path(POLICIES, namespaceName.toString())); + log.info("[{}] Successfully updated compactionThreshold configuration: namespace={}, value={}", + clientAppId(), namespaceName, policies.compaction_threshold); + + } catch (KeeperException.NoNodeException e) { + log.warn("[{}] Failed to update compactionThreshold configuration for namespace {}: does not exist", + clientAppId(), namespaceName); + throw new RestException(Status.NOT_FOUND, "Namespace does not exist"); + } catch (KeeperException.BadVersionException e) { + log.warn("[{}] Failed to update maxConsumersPerSubscription configuration for namespace {}: concurrent modification", + clientAppId(), namespaceName); + throw new RestException(Status.CONFLICT, "Concurrent modification"); + } catch (RestException pfe) { + throw pfe; + } catch (Exception e) { + log.error("[{}] Failed to update compactionThreshold configuration for namespace {}", + clientAppId(), namespaceName, e); + throw new RestException(e); + } + } + private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index cd34b0d..f8edcf1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -715,6 +715,36 @@ public class Namespaces extends NamespacesBase { internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription); } + @GET + @Path("/{property}/{cluster}/{namespace}/compactionThreshold") + @ApiOperation(value = "Maximum number of uncompacted bytes in topics before compaction is triggered.", + notes = "The backlog size is compared to the threshold periodically. " + + "A threshold of 0 disabled automatic compaction") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace doesn't exist") }) + public long getCompactionThreshold(@PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace) { + validateNamespaceName(property, cluster, namespace); + return internalGetCompactionThreshold(); + } + + @PUT + @Path("/{property}/{cluster}/{namespace}/compactionThreshold") + @ApiOperation(value = "Set maximum number of uncompacted bytes in a topic before compaction is triggered.", + notes = "The backlog size is compared to the threshold periodically. " + + "A threshold of 0 disabled automatic compaction") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace doesn't exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "compactionThreshold value is not valid") }) + public void setCompactionThreshold(@PathParam("property") String property, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, + long newThreshold) { + validateNamespaceName(property, cluster, namespace); + internalSetCompactionThreshold(newThreshold); + } private static final Logger log = LoggerFactory.getLogger(Namespaces.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 15ac532..bd467ed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -657,5 +657,34 @@ public class Namespaces extends NamespacesBase { return policies; } + @GET + @Path("/{property}/{namespace}/compactionThreshold") + @ApiOperation(value = "Maximum number of uncompacted bytes in topics before compaction is triggered.", + notes = "The backlog size is compared to the threshold periodically. " + + "A threshold of 0 disabled automatic compaction") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace doesn't exist") }) + public long getCompactionThreshold(@PathParam("property") String property, + @PathParam("namespace") String namespace) { + validateNamespaceName(property, namespace); + return internalGetCompactionThreshold(); + } + + @PUT + @Path("/{property}/{namespace}/compactionThreshold") + @ApiOperation(value = "Set maximum number of uncompacted bytes in a topic before compaction is triggered.", + notes = "The backlog size is compared to the threshold periodically. " + + "A threshold of 0 disabled automatic compaction") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace doesn't exist"), + @ApiResponse(code = 409, message = "Concurrent modification"), + @ApiResponse(code = 412, message = "compactionThreshold value is not valid") }) + public void setCompactionThreshold(@PathParam("property") String property, + @PathParam("namespace") String namespace, + long newThreshold) { + validateNamespaceName(property, namespace); + internalSetCompactionThreshold(newThreshold); + } + private static final Logger log = LoggerFactory.getLogger(Namespaces.class); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 373c535..03cb52d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -1117,4 +1117,51 @@ public interface Namespaces { * Unexpected error */ void setMaxConsumersPerSubscription(String namespace, int maxConsumersPerSubscription) throws PulsarAdminException; + + /** + * Get the compactionThreshold for a namespace. The maximum number of bytes topics in the namespace + * can have before compaction is triggered. 0 disables. + * <p> + * Response example: + * + * <pre> + * <code>10000000</code> + * </pre> + * + * @param namespace + * Namespace name + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + long getCompactionThreshold(String namespace) throws PulsarAdminException; + + /** + * Set the compactionThreshold for a namespace. The maximum number of bytes topics in the namespace + * can have before compaction is triggered. 0 disables. + * <p> + * Request example: + * + * <pre> + * <code>10000000</code> + * </pre> + * + * @param namespace + * Namespace name + * @param compactionThreshold + * maximum number of backlog bytes before compaction is triggered + * + * @throws NotAuthorizedException + * Don't have admin permission + * @throws NotFoundException + * Namespace does not exist + * @throws PulsarAdminException + * Unexpected error + */ + void setCompactionThreshold(String namespace, long compactionThreshold) throws PulsarAdminException; + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index 7c4fd04..6c79a06 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -657,6 +657,28 @@ public class NamespacesImpl extends BaseResource implements Namespaces { } } + @Override + public long getCompactionThreshold(String namespace) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "compactionThreshold"); + return request(path).get(Long.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void setCompactionThreshold(String namespace, long compactionThreshold) throws PulsarAdminException { + try { + NamespaceName ns = NamespaceName.get(namespace); + WebTarget path = namespacePath(ns, "compactionThreshold"); + request(path).put(Entity.entity(compactionThreshold, MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + private WebTarget namespacePath(NamespaceName namespace, String... parts) { final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces; WebTarget namespacePath = base.path(namespace.toString()); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index 5259879..9603247 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -766,6 +766,36 @@ public class CmdNamespaces extends CmdBase { } } + @Parameters(commandDescription = "Get compactionThreshold for a namespace") + private class GetCompactionThreshold extends CliCommand { + @Parameter(description = "tenant/namespace\n", required = true) + private java.util.List<String> params; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + print(admin.namespaces().getCompactionThreshold(namespace)); + } + } + + @Parameters(commandDescription = "Set compactionThreshold for a namespace") + private class SetCompactionThreshold extends CliCommand { + @Parameter(description = "tenant/namespace", required = true) + private java.util.List<String> params; + + @Parameter(names = { "--threshold", "-t" }, + description = "Maximum number of bytes in a topic backlog before compaction is triggered " + + "(eg: 10M, 16G, 3T). 0 disables automatic compaction", + required = true) + private String threshold = "0"; + + @Override + void run() throws PulsarAdminException { + String namespace = validateNamespace(params); + admin.namespaces().setCompactionThreshold(namespace, validateSizeString(threshold)); + } + } + private static long validateSizeString(String s) { char last = s.charAt(s.length() - 1); String subStr = s.substring(0, s.length() - 1); @@ -874,5 +904,8 @@ public class CmdNamespaces extends CmdBase { jcommander.addCommand("set-max-consumers-per-topic", new SetMaxConsumersPerTopic()); jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription()); jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription()); + + jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold()); + jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold()); } } diff --git a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml index 1a20574..268e6d7 100644 --- a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml +++ b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml @@ -131,6 +131,7 @@ pulsar-broker1*: - zookeeperServers=zookeeper - configurationStoreServers=configuration-store:2184 - clusterName=test + - brokerServiceCompactionMonitorIntervalInSeconds=1 labels: cluster: test service: pulsar-broker @@ -150,6 +151,7 @@ pulsar-broker2*: - zookeeperServers=zookeeper - configurationStoreServers=configuration-store:2184 - clusterName=test + - brokerServiceCompactionMonitorIntervalInSeconds=1 labels: cluster: test service: pulsar-broker diff --git a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java index 5570f05..a716f9d 100644 --- a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java +++ b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java @@ -150,4 +150,61 @@ public class TestCompaction extends Arquillian { } } + private static void waitAndVerifyCompacted(PulsarClient client, String topic, + String sub, String expectedKey, String expectedValue) throws Exception { + for (int i = 0; i < 60; i++) { + try (Consumer<byte[]> consumer = client.newConsumer().topic(topic) + .readCompacted(true).subscriptionName(sub).subscribe()) { + Message<byte[]> m = consumer.receive(); + Assert.assertEquals(m.getKey(), expectedKey); + if (new String(m.getData()).equals(expectedValue)) { + break; + } + } + Thread.sleep(1000); + } + try (Consumer<byte[]> consumer = client.newConsumer().topic(topic) + .readCompacted(true).subscriptionName(sub).subscribe()) { + Message<byte[]> m = consumer.receive(); + Assert.assertEquals(m.getKey(), expectedKey); + Assert.assertEquals(new String(m.getData()), expectedValue); + } + } + + @Test + public void testPublishWithAutoCompaction() throws Exception { + PulsarClusterUtils.runOnAnyBroker(docker, clusterName, + PulsarClusterUtils.PULSAR_ADMIN, "tenants", + "create", "compaction-test-auto", + "--allowed-clusters", clusterName, + "--admin-roles", "admin"); + PulsarClusterUtils.runOnAnyBroker(docker, clusterName, + PulsarClusterUtils.PULSAR_ADMIN, "namespaces", + "create", "--clusters", "test", "compaction-test-auto/ns1"); + PulsarClusterUtils.runOnAnyBroker(docker, clusterName, + PulsarClusterUtils.PULSAR_ADMIN, "namespaces", + "set-compaction-threshold", "--threshold", "1", "compaction-test-auto/ns1"); + + String brokerIp = DockerUtils.getContainerIP( + docker, PulsarClusterUtils.proxySet(docker, clusterName).stream().findAny().get()); + String serviceUrl = "pulsar://" + brokerIp + ":6650"; + String topic = "persistent://compaction-test-auto/ns1/topic1"; + + try (PulsarClient client = PulsarClient.create(serviceUrl)) { + client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close(); + + try(Producer<byte[]> producer = client.newProducer().topic(topic).create()) { + producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build()); + producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build()); + } + + waitAndVerifyCompacted(client, topic, "sub1", "key0", "content1"); + + try(Producer<byte[]> producer = client.newProducer().topic(topic).create()) { + producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build()); + } + waitAndVerifyCompacted(client, topic, "sub1", "key0", "content2"); + } + } + } -- To stop receiving notification emails like this one, please contact si...@apache.org.