This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b2a373d  Offload threshold policy for namespaces (#1973)
b2a373d is described below

commit b2a373d903c183c95dd5e29b317ef03af9bf81ff
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Mon Jun 18 09:35:13 2018 +0200

    Offload threshold policy for namespaces (#1973)
    
    Allow administrators to specify a offload threshold policy on a
    namespace, which stipulates that once a topic in the namespace has a
    certain amount of data on the pulsar cluster, start offloading some of
    this data to longterm storage.
    
    This patch also cleans up TestS3Offload, and adds offload status each
    ledger in topic internal stats.
    
    Master Issue: #1511
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  37 ++++++
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  31 +++++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  29 +++++
 .../pulsar/broker/service/BrokerService.java       |   2 +
 .../broker/service/persistent/PersistentTopic.java |   1 +
 .../org/apache/pulsar/client/admin/Namespaces.java |  48 ++++++++
 .../client/admin/internal/NamespacesImpl.java      |  22 ++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  36 ++++++
 .../data/PersistentTopicInternalStats.java         |   1 +
 .../pulsar/common/policies/data/Policies.java      |   7 +-
 .../pulsar/tests/integration/TestS3Offload.java    | 134 ++++++++++++++++-----
 11 files changed, 313 insertions(+), 35 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index bba3dc7..b8a4c53 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1570,5 +1570,42 @@ public abstract class NamespacesBase extends 
AdminResource {
         }
     }
 
+    protected long internalGetOffloadThreshold() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        return getNamespacePolicies(namespaceName).offload_threshold;
+    }
+
+    protected void internalSetOffloadThreshold(long newThreshold) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, 
Policies.class);
+            policies.offload_threshold = newThreshold;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), 
nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, 
namespaceName.toString()));
+            log.info("[{}] Successfully updated offloadThreshold 
configuration: namespace={}, value={}",
+                     clientAppId(), namespaceName, 
policies.compaction_threshold);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update offloadThreshold configuration for 
namespace {}: does not exist",
+                     clientAppId(), namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update offloadThreshold configuration for 
namespace {}: concurrent modification",
+                     clientAppId(), namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent 
modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update offloadThreshold configuration 
for namespace {}",
+                      clientAppId(), namespaceName, e);
+            throw new RestException(e);
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index f8edcf1..bbc4438 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -746,5 +746,36 @@ public class Namespaces extends NamespacesBase {
         internalSetCompactionThreshold(newThreshold);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/offloadThreshold")
+    @ApiOperation(value = "Maximum number of bytes stored on the pulsar 
cluster for a topic,"
+                          + " before the broker will start offloading to 
longterm storage",
+                  notes = "A negative value disables automatic offloading")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist") })
+    public long getOffloadThreshold(@PathParam("property") String property,
+                                    @PathParam("cluster") String cluster,
+                                    @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, cluster, namespace);
+        return internalGetOffloadThreshold();
+    }
+
+    @PUT
+    @Path("/{property}/{cluster}/{namespace}/offloadThreshold")
+    @ApiOperation(value = "Set maximum number of bytes stored on the pulsar 
cluster for a topic,"
+                          + " before the broker will start offloading to 
longterm storage",
+                  notes = "A negative value disables automatic offloading")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent 
modification"),
+                            @ApiResponse(code = 412, message = 
"offloadThreshold value is not valid") })
+    public void setOffloadThreshold(@PathParam("property") String property,
+                                    @PathParam("cluster") String cluster,
+                                    @PathParam("namespace") String namespace,
+                                    long newThreshold) {
+        validateNamespaceName(property, cluster, namespace);
+        internalSetOffloadThreshold(newThreshold);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(Namespaces.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index bd467ed..0fcd4aa 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -686,5 +686,34 @@ public class Namespaces extends NamespacesBase {
         internalSetCompactionThreshold(newThreshold);
     }
 
+    @GET
+    @Path("/{property}/{namespace}/offloadThreshold")
+    @ApiOperation(value = "Maximum number of bytes stored on the pulsar 
cluster for a topic,"
+                          + " before the broker will start offloading to 
longterm storage",
+                  notes = "A negative value disables automatic offloading")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist") })
+    public long getOffloadThreshold(@PathParam("property") String property,
+                                       @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(property, namespace);
+        return internalGetOffloadThreshold();
+    }
+
+    @PUT
+    @Path("/{property}/{namespace}/offloadThreshold")
+    @ApiOperation(value = "Set maximum number of bytes stored on the pulsar 
cluster for a topic,"
+                          + " before the broker will start offloading to 
longterm storage",
+                  notes = "A negative value disables automatic offloading")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent 
modification"),
+                            @ApiResponse(code = 412, message = 
"offloadThreshold value is not valid") })
+    public void setOffloadThreshold(@PathParam("property") String property,
+                                    @PathParam("namespace") String namespace,
+                                    long newThreshold) {
+        validateNamespaceName(property, namespace);
+        internalSetOffloadThreshold(newThreshold);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(Namespaces.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 63d74dd..5bdf812 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -746,6 +746,8 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
             
managedLedgerConfig.setOffloadLedgerDeletionLag(serviceConfig.getManagedLedgerOffloadDeletionLagMs(),
                                                             
TimeUnit.MILLISECONDS);
 
+            policies.ifPresent(p -> 
managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(p.offload_threshold));
+
             future.complete(managedLedgerConfig);
         }, (exception) -> future.completeExceptionally(exception)));
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4768fa8..7db3e71 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1405,6 +1405,7 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
             info.ledgerId = li.getLedgerId();
             info.entries = li.getEntries();
             info.size = li.getSize();
+            info.offloaded = li.hasOffloadContext() && 
li.getOffloadContext().getComplete();
             stats.ledgers.add(info);
         });
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 03cb52d..5ee2f22 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -1164,4 +1164,52 @@ public interface Namespaces {
      */
     void setCompactionThreshold(String namespace, long compactionThreshold) 
throws PulsarAdminException;
 
+    /**
+     * Get the offloadThreshold for a namespace. The maximum number of bytes 
stored on the pulsar cluster for topics
+     * in the namespace before data starts being offloaded to longterm storage.
+     *
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    long getOffloadThreshold(String namespace) throws PulsarAdminException;
+
+    /**
+     * Set the offloadThreshold for a namespace. The maximum number of bytes 
stored on the pulsar cluster for topics
+     * in the namespace before data starts being offloaded to longterm storage.
+     *
+     * Negative values disabled automatic offloading. Setting a threshold of 0 
will offload data as soon as possible.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param offloadThreshold
+     *            maximum number of bytes stored before offloading is triggered
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setOffloadThreshold(String namespace, long compactionThreshold) 
throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 6c79a06..4ed0b8a 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -679,6 +679,28 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
         }
     }
 
+    @Override
+    public long getOffloadThreshold(String namespace) throws 
PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "offloadThreshold");
+            return request(path).get(Long.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setOffloadThreshold(String namespace, long offloadThreshold) 
throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "offloadThreshold");
+            request(path).put(Entity.entity(offloadThreshold, 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2Namespaces : 
adminNamespaces;
         WebTarget namespacePath = base.path(namespace.toString());
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 1d881ab..0eaf81b 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -796,6 +796,38 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get offloadThreshold for a namespace")
+    private class GetOffloadThreshold extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getOffloadThreshold(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set offloadThreshold for a namespace")
+    private class SetOffloadThreshold extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--size", "-s" },
+                   description = "Maximum number of bytes stored in the pulsar 
cluster for a topic before data will"
+                                 + " start being automatically offloaded to 
longterm storage (eg: 10M, 16G, 3T, 100)."
+                                 + " Negative values disable automatic 
offload."
+                                 + " 0 triggers offloading as soon as 
possible.",
+                   required = true)
+        private String threshold = "-1";
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setOffloadThreshold(namespace, 
validateSizeString(threshold));
+        }
+    }
+
     public CmdNamespaces(PulsarAdmin admin) {
         super("namespaces", admin);
         jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -857,5 +889,9 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("get-compaction-threshold", new 
GetCompactionThreshold());
         jcommander.addCommand("set-compaction-threshold", new 
SetCompactionThreshold());
+
+        jcommander.addCommand("get-offload-threshold", new 
GetOffloadThreshold());
+        jcommander.addCommand("set-offload-threshold", new 
SetOffloadThreshold());
+
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index 2654515..c0416f5 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -45,6 +45,7 @@ public class PersistentTopicInternalStats {
         public long ledgerId;
         public long entries;
         public long size;
+        public boolean offloaded;
     }
 
     public static class CursorStats {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index a33119e..35a752b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -58,6 +58,7 @@ public class Policies {
     public int max_consumers_per_subscription = 0;
 
     public long compaction_threshold = 0;
+    public long offload_threshold = -1;
 
     @Override
     public boolean equals(Object obj) {
@@ -78,7 +79,8 @@ public class Policies {
                     && max_producers_per_topic == other.max_producers_per_topic
                     && max_consumers_per_topic == other.max_consumers_per_topic
                     && max_consumers_per_subscription == 
other.max_consumers_per_subscription
-                    && compaction_threshold == other.compaction_threshold;
+                    && compaction_threshold == other.compaction_threshold
+                    && offload_threshold == other.offload_threshold;
         }
 
         return false;
@@ -109,6 +111,7 @@ public class Policies {
                 .add("max_producers_per_topic", max_producers_per_topic)
                 .add("max_consumers_per_topic", max_consumers_per_topic)
                 .add("max_consumers_per_subscription", max_consumers_per_topic)
-                .add("compaction_threshold", compaction_threshold).toString();
+                .add("compaction_threshold", compaction_threshold)
+                .add("offload_threshold", offload_threshold).toString();
     }
 }
diff --git 
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
 
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
index b993aa5..4248976 100644
--- 
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
+++ 
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
@@ -21,17 +21,11 @@ package org.apache.pulsar.tests.integration;
 import com.github.dockerjava.api.DockerClient;
 import com.google.common.collect.ImmutableMap;
 
-import java.net.URL;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
-import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
-
-import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -91,7 +85,6 @@ public class TestS3Offload extends Arquillian {
     public void teardownBrokers() throws Exception {
         PulsarClusterUtils.stopAllProxies(docker, CLUSTER_NAME);
         Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker, 
CLUSTER_NAME));
-
     }
 
     private static byte[] buildEntry(String pattern) {
@@ -106,30 +99,30 @@ public class TestS3Offload extends Arquillian {
 
     @Test
     public void testPublishOffloadAndConsumeViaCLI() throws Exception {
+        final String TENANT = "s3-offload-test-cli";
+        final String NAMESPACE = "s3-offload-test-cli/ns1";
+        final String TOPIC = "persistent://s3-offload-test-cli/ns1/topic1";
+
         PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
                 "/pulsar/bin/pulsar-admin", "tenants",
                 "create", "--allowed-clusters", CLUSTER_NAME,
-                "--admin-roles", "offload-admin", "s3-offload-test");
+                "--admin-roles", "offload-admin", TENANT);
         PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
                 "/pulsar/bin/pulsar-admin", "namespaces",
-                "create", "--clusters", CLUSTER_NAME, "s3-offload-test/ns1");
+                "create", "--clusters", CLUSTER_NAME, NAMESPACE);
 
-        String broker = PulsarClusterUtils.brokerSet(docker, 
CLUSTER_NAME).stream().findAny().get();
-        String brokerIp = DockerUtils.getContainerIP(docker, broker);
-        String proxyIp  = PulsarClusterUtils.proxySet(docker, CLUSTER_NAME)
+        String broker = PulsarClusterUtils.brokerSet(docker, 
CLUSTER_NAME).stream().findFirst().get();
+        String proxyIp = PulsarClusterUtils.proxySet(docker, CLUSTER_NAME)
             .stream().map((c) -> DockerUtils.getContainerIP(docker, 
c)).findFirst().get();
         String serviceUrl = "pulsar://" + proxyIp + ":6650";
-        String adminUrl = "http://"; + brokerIp + ":8080";
-        String topic = "persistent://s3-offload-test/ns1/topic1";
-
-        ClientConfiguration bkConf = new ClientConfiguration();
-        bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker, 
CLUSTER_NAME));
+        String adminUrl = "http://"; + proxyIp + ":8080";
 
         long firstLedger = -1;
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Producer producer = client.newProducer().topic(topic)
-                .blockIfQueueFull(true).enableBatching(false).create()) {
-            client.subscribe(topic, "my-sub").close();
+            Producer producer = client.newProducer().topic(TOPIC)
+                .blockIfQueueFull(true).enableBatching(false).create();
+            PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+            
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe().close();
 
             // write enough to topic to make it roll
             int i = 0;
@@ -139,40 +132,35 @@ public class TestS3Offload extends Arquillian {
             MessageId latestMessage = 
producer.send(buildEntry("offload-message"+i));
 
             // read managed ledger info, check ledgers exist
-            ManagedLedgerFactory mlf = new ManagedLedgerFactoryImpl(bkConf);
-            ManagedLedgerInfo info = 
mlf.getManagedLedgerInfo("s3-offload-test/ns1/persistent/topic1");
-            Assert.assertEquals(info.ledgers.size(), 2);
-
-            firstLedger = info.ledgers.get(0).ledgerId;
+            firstLedger = 
admin.topics().getInternalStats(TOPIC).ledgers.get(0).ledgerId;
 
             // first offload with a high threshold, nothing should offload
             String output = DockerUtils.runCommand(docker, broker,
                     "/pulsar/bin/pulsar-admin", "topics",
-                    "offload", "--size-threshold", "100G",
-                    topic);
+                    "offload", "--size-threshold", "100G", TOPIC);
             Assert.assertTrue(output.contains("Nothing to offload"));
 
             output = DockerUtils.runCommand(docker, broker,
-                    "/pulsar/bin/pulsar-admin", "topics", "offload-status", 
topic);
+                    "/pulsar/bin/pulsar-admin", "topics", "offload-status", 
TOPIC);
             Assert.assertTrue(output.contains("Offload has not been run"));
 
             // offload with a low threshold
             output = DockerUtils.runCommand(docker, broker,
                     "/pulsar/bin/pulsar-admin", "topics",
-                    "offload", "--size-threshold", "1M",
-                    topic);
+                    "offload", "--size-threshold", "1M", TOPIC);
             Assert.assertTrue(output.contains("Offload triggered"));
 
             output = DockerUtils.runCommand(docker, broker,
-                    "/pulsar/bin/pulsar-admin", "topics", "offload-status", 
"-w", topic);
+                    "/pulsar/bin/pulsar-admin", "topics", "offload-status", 
"-w", TOPIC);
             Assert.assertTrue(output.contains("Offload was a success"));
         }
 
-        log.info("Kill ledger");
         // stop brokers to clear all caches, open handles, etc
         Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker, 
CLUSTER_NAME));
 
         // delete the first ledger, so that we cannot possibly read from it
+        ClientConfiguration bkConf = new ClientConfiguration();
+        bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker, 
CLUSTER_NAME));
         try (BookKeeper bk = new BookKeeper(bkConf)) {
             bk.deleteLedger(firstLedger);
         }
@@ -182,7 +170,7 @@ public class TestS3Offload extends Arquillian {
 
         log.info("Read back the data (which would be in that first ledger)");
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Consumer consumer = 
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
+            Consumer consumer = 
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe()) {
             // read back from topic
             for (int i = 0; i < ENTRIES_PER_LEDGER*1.5; i++) {
                 Message m = consumer.receive(1, TimeUnit.MINUTES);
@@ -190,4 +178,84 @@ public class TestS3Offload extends Arquillian {
             }
         }
     }
+
+    @Test
+    public void testPublishOffloadAndConsumeViaThreshold() throws Exception {
+        final String TENANT = "s3-offload-test-threshold";
+        final String NAMESPACE = "s3-offload-test-threshold/ns1";
+        final String TOPIC = 
"persistent://s3-offload-test-threshold/ns1/topic1";
+
+        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
+                "/pulsar/bin/pulsar-admin", "tenants",
+                "create", "--allowed-clusters", CLUSTER_NAME,
+                "--admin-roles", "offload-admin", TENANT);
+        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
+                "/pulsar/bin/pulsar-admin", "namespaces",
+                "create", "--clusters", CLUSTER_NAME, NAMESPACE);
+        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
+                "/pulsar/bin/pulsar-admin", "namespaces",
+                "set-offload-threshold", "--size", "1M", NAMESPACE);
+
+        String proxyIp  = PulsarClusterUtils.proxySet(docker, CLUSTER_NAME)
+            .stream().map((c) -> DockerUtils.getContainerIP(docker, 
c)).findFirst().get();
+        String serviceUrl = "pulsar://" + proxyIp + ":6650";
+        String adminUrl = "http://"; + proxyIp + ":8080";
+
+        long firstLedger = 0;
+        try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
+            Producer producer = client.newProducer().topic(TOPIC)
+                .blockIfQueueFull(true).enableBatching(false).create();
+            PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+
+            
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe().close();
+
+            // write enough to topic to make it roll twice
+            for (int i = 0; i < ENTRIES_PER_LEDGER*2.5; i++) {
+                producer.sendAsync(buildEntry("offload-message"+i));
+            }
+            producer.send(buildEntry("final-offload-message"));
+
+            firstLedger = 
admin.topics().getInternalStats(TOPIC).ledgers.get(0).ledgerId;
+
+            // wait up to 30 seconds for offload to occur
+            for (int i = 0; i < 300 && 
!admin.topics().getInternalStats(TOPIC).ledgers.get(0).offloaded; i++) {
+                Thread.sleep(100);
+            }
+            
Assert.assertTrue(admin.topics().getInternalStats(TOPIC).ledgers.get(0).offloaded);
+        }
+
+        // stop brokers to clear all caches, open handles, etc
+        Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker, 
CLUSTER_NAME));
+
+        // delete the first ledger, so that we cannot possibly read from it
+        ClientConfiguration bkConf = new ClientConfiguration();
+        bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker, 
CLUSTER_NAME));
+        try (BookKeeper bk = new BookKeeper(bkConf)) {
+            bk.deleteLedger(firstLedger);
+        }
+
+        // start all brokers again
+        Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker, 
CLUSTER_NAME));
+
+        log.info("Read back the data (which would be in that first ledger)");
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
+             Consumer consumer = 
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe()) {
+            // read back from topic
+            for (int i = 0; i < ENTRIES_PER_LEDGER*2.5; i++) {
+                Message m = consumer.receive(1, TimeUnit.MINUTES);
+                Assert.assertEquals(buildEntry("offload-message"+i), 
m.getData());
+            }
+        }
+
+        // try disabling
+        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
+                "/pulsar/bin/pulsar-admin", "namespaces",
+                "set-offload-threshold", "--size", "-1", NAMESPACE);
+
+        // hard to validate that it has been disabled as we'd be waiting for
+        // something _not_ to happen (i.e. waiting for ages), so just check
+        try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+            
Assert.assertEquals(admin.namespaces().getOffloadThreshold(NAMESPACE), -1L);
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to