This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2245c37  Rest API, AdminClient and CLI for compaction threshold (#1924)
2245c37 is described below

commit 2245c3785e161d3d421bdd2120e0272878b01c6f
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Sat Jun 9 21:41:39 2018 +0200

    Rest API, AdminClient and CLI for compaction threshold (#1924)
    
    Allow the user to specify a namespace policy for the maximum backlog
    size for topics in that namespace. When a topic backlog reaches this
    size, compaction is triggered.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 41 ++++++++++++++++
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 30 ++++++++++++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 29 +++++++++++
 .../org/apache/pulsar/client/admin/Namespaces.java | 47 ++++++++++++++++++
 .../client/admin/internal/NamespacesImpl.java      | 22 +++++++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 33 +++++++++++++
 .../single-cluster-3-bookie-2-broker.yaml          |  2 +
 .../pulsar/tests/integration/TestCompaction.java   | 57 ++++++++++++++++++++++
 8 files changed, 261 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index e11864d..c845fd5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1529,5 +1529,46 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
+    protected long internalGetCompactionThreshold() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        return getNamespacePolicies(namespaceName).compaction_threshold;
+    }
+
+    protected void internalSetCompactionThreshold(long newThreshold) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, 
Policies.class);
+            if (newThreshold < 0) {
+                throw new RestException(Status.PRECONDITION_FAILED,
+                        "compactionThreshold must be 0 or more");
+            }
+            policies.compaction_threshold = newThreshold;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), 
nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, 
namespaceName.toString()));
+            log.info("[{}] Successfully updated compactionThreshold 
configuration: namespace={}, value={}",
+                     clientAppId(), namespaceName, 
policies.compaction_threshold);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update compactionThreshold configuration 
for namespace {}: does not exist",
+                     clientAppId(), namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update maxConsumersPerSubscription 
configuration for namespace {}: concurrent modification",
+                     clientAppId(), namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent 
modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update compactionThreshold configuration 
for namespace {}",
+                      clientAppId(), namespaceName, e);
+            throw new RestException(e);
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index cd34b0d..f8edcf1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -715,6 +715,36 @@ public class Namespaces extends NamespacesBase {
         internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/compactionThreshold")
+    @ApiOperation(value = "Maximum number of uncompacted bytes in topics 
before compaction is triggered.",
+                  notes = "The backlog size is compared to the threshold 
periodically. "
+                          + "A threshold of 0 disabled automatic compaction")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist") })
+    public long getCompactionThreshold(@PathParam("property") String property,
+                                       @PathParam("cluster") String cluster,
+                                       @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(property, cluster, namespace);
+        return internalGetCompactionThreshold();
+    }
+
+    @PUT
+    @Path("/{property}/{cluster}/{namespace}/compactionThreshold")
+    @ApiOperation(value = "Set maximum number of uncompacted bytes in a topic 
before compaction is triggered.",
+                  notes = "The backlog size is compared to the threshold 
periodically. "
+                          + "A threshold of 0 disabled automatic compaction")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent 
modification"),
+                            @ApiResponse(code = 412, message = 
"compactionThreshold value is not valid") })
+    public void setCompactionThreshold(@PathParam("property") String property,
+                                       @PathParam("cluster") String cluster,
+                                       @PathParam("namespace") String 
namespace,
+                                       long newThreshold) {
+        validateNamespaceName(property, cluster, namespace);
+        internalSetCompactionThreshold(newThreshold);
+    }
 
     private static final Logger log = 
LoggerFactory.getLogger(Namespaces.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 15ac532..bd467ed 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -657,5 +657,34 @@ public class Namespaces extends NamespacesBase {
         return policies;
     }
 
+    @GET
+    @Path("/{property}/{namespace}/compactionThreshold")
+    @ApiOperation(value = "Maximum number of uncompacted bytes in topics 
before compaction is triggered.",
+                  notes = "The backlog size is compared to the threshold 
periodically. "
+                          + "A threshold of 0 disabled automatic compaction")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist") })
+    public long getCompactionThreshold(@PathParam("property") String property,
+                                       @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(property, namespace);
+        return internalGetCompactionThreshold();
+    }
+
+    @PUT
+    @Path("/{property}/{namespace}/compactionThreshold")
+    @ApiOperation(value = "Set maximum number of uncompacted bytes in a topic 
before compaction is triggered.",
+                  notes = "The backlog size is compared to the threshold 
periodically. "
+                          + "A threshold of 0 disabled automatic compaction")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent 
modification"),
+                            @ApiResponse(code = 412, message = 
"compactionThreshold value is not valid") })
+    public void setCompactionThreshold(@PathParam("property") String property,
+                                       @PathParam("namespace") String 
namespace,
+                                       long newThreshold) {
+        validateNamespaceName(property, namespace);
+        internalSetCompactionThreshold(newThreshold);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(Namespaces.class);
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 373c535..03cb52d 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -1117,4 +1117,51 @@ public interface Namespaces {
      *             Unexpected error
      */
     void setMaxConsumersPerSubscription(String namespace, int 
maxConsumersPerSubscription) throws PulsarAdminException;
+
+    /**
+     * Get the compactionThreshold for a namespace. The maximum number of 
bytes topics in the namespace
+     * can have before compaction is triggered. 0 disables.
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    long getCompactionThreshold(String namespace) throws PulsarAdminException;
+
+    /**
+     * Set the compactionThreshold for a namespace. The maximum number of 
bytes topics in the namespace
+     * can have before compaction is triggered. 0 disables.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param compactionThreshold
+     *            maximum number of backlog bytes before compaction is 
triggered
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setCompactionThreshold(String namespace, long compactionThreshold) 
throws PulsarAdminException;
+
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 7c4fd04..6c79a06 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -657,6 +657,28 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
         }
     }
 
+    @Override
+    public long getCompactionThreshold(String namespace) throws 
PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "compactionThreshold");
+            return request(path).get(Long.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setCompactionThreshold(String namespace, long 
compactionThreshold) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "compactionThreshold");
+            request(path).put(Entity.entity(compactionThreshold, 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2Namespaces : 
adminNamespaces;
         WebTarget namespacePath = base.path(namespace.toString());
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 5259879..9603247 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -766,6 +766,36 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get compactionThreshold for a namespace")
+    private class GetCompactionThreshold extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getCompactionThreshold(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set compactionThreshold for a namespace")
+    private class SetCompactionThreshold extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--threshold", "-t" },
+                   description = "Maximum number of bytes in a topic backlog 
before compaction is triggered "
+                                 + "(eg: 10M, 16G, 3T). 0 disables automatic 
compaction",
+                   required = true)
+        private String threshold = "0";
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setCompactionThreshold(namespace, 
validateSizeString(threshold));
+        }
+    }
+
     private static long validateSizeString(String s) {
         char last = s.charAt(s.length() - 1);
         String subStr = s.substring(0, s.length() - 1);
@@ -874,5 +904,8 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("set-max-consumers-per-topic", new 
SetMaxConsumersPerTopic());
         jcommander.addCommand("get-max-consumers-per-subscription", new 
GetMaxConsumersPerSubscription());
         jcommander.addCommand("set-max-consumers-per-subscription", new 
SetMaxConsumersPerSubscription());
+
+        jcommander.addCommand("get-compaction-threshold", new 
GetCompactionThreshold());
+        jcommander.addCommand("set-compaction-threshold", new 
SetCompactionThreshold());
     }
 }
diff --git 
a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml
 
b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml
index 1a20574..268e6d7 100644
--- 
a/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml
+++ 
b/tests/integration-tests-topologies/src/main/resources/cube-definitions/single-cluster-3-bookie-2-broker.yaml
@@ -131,6 +131,7 @@ pulsar-broker1*:
     - zookeeperServers=zookeeper
     - configurationStoreServers=configuration-store:2184
     - clusterName=test
+    - brokerServiceCompactionMonitorIntervalInSeconds=1
   labels:
     cluster: test
     service: pulsar-broker
@@ -150,6 +151,7 @@ pulsar-broker2*:
     - zookeeperServers=zookeeper
     - configurationStoreServers=configuration-store:2184
     - clusterName=test
+    - brokerServiceCompactionMonitorIntervalInSeconds=1
   labels:
     cluster: test
     service: pulsar-broker
diff --git 
a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
 
b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
index 5570f05..a716f9d 100644
--- 
a/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
+++ 
b/tests/integration/compaction/src/test/java/org/apache/pulsar/tests/integration/TestCompaction.java
@@ -150,4 +150,61 @@ public class TestCompaction extends Arquillian {
         }
     }
 
+    private static void waitAndVerifyCompacted(PulsarClient client, String 
topic,
+                                               String sub, String expectedKey, 
String expectedValue) throws Exception {
+        for (int i = 0; i < 60; i++) {
+            try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+                 .readCompacted(true).subscriptionName(sub).subscribe()) {
+                Message<byte[]> m = consumer.receive();
+                Assert.assertEquals(m.getKey(), expectedKey);
+                if (new String(m.getData()).equals(expectedValue)) {
+                    break;
+                }
+            }
+            Thread.sleep(1000);
+        }
+        try (Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+                .readCompacted(true).subscriptionName(sub).subscribe()) {
+            Message<byte[]> m = consumer.receive();
+            Assert.assertEquals(m.getKey(), expectedKey);
+            Assert.assertEquals(new String(m.getData()), expectedValue);
+        }
+    }
+
+    @Test
+    public void testPublishWithAutoCompaction() throws Exception {
+        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
+                                          PulsarClusterUtils.PULSAR_ADMIN, 
"tenants",
+                                          "create", "compaction-test-auto",
+                                          "--allowed-clusters", clusterName,
+                                          "--admin-roles", "admin");
+        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
+                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
+                "create", "--clusters", "test", "compaction-test-auto/ns1");
+        PulsarClusterUtils.runOnAnyBroker(docker, clusterName,
+                PulsarClusterUtils.PULSAR_ADMIN, "namespaces",
+                "set-compaction-threshold", "--threshold", "1", 
"compaction-test-auto/ns1");
+
+        String brokerIp = DockerUtils.getContainerIP(
+                docker, PulsarClusterUtils.proxySet(docker, 
clusterName).stream().findAny().get());
+        String serviceUrl = "pulsar://" + brokerIp + ":6650";
+        String topic = "persistent://compaction-test-auto/ns1/topic1";
+
+        try (PulsarClient client = PulsarClient.create(serviceUrl)) {
+            
client.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
+
+            try(Producer<byte[]> producer = 
client.newProducer().topic(topic).create()) {
+                
producer.send(MessageBuilder.create().setKey("key0").setContent("content0".getBytes()).build());
+                
producer.send(MessageBuilder.create().setKey("key0").setContent("content1".getBytes()).build());
+            }
+
+            waitAndVerifyCompacted(client, topic, "sub1", "key0", "content1");
+
+            try(Producer<byte[]> producer = 
client.newProducer().topic(topic).create()) {
+                
producer.send(MessageBuilder.create().setKey("key0").setContent("content2".getBytes()).build());
+            }
+            waitAndVerifyCompacted(client, topic, "sub1", "key0", "content2");
+        }
+    }
+
 }

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to