This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bca601  Added infinite time retention configuration option (#1135)
1bca601 is described below

commit 1bca601537f7a399ea8324f4e07919b00d1fc771
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Wed Jan 31 10:22:38 2018 -0800

    Added infinite time retention configuration option (#1135)
    
    * Added infinite time retention configuration option
    
    * Fixed test
    
    * Updated CLI docs
---
 .../bookkeeper/mledger/ManagedLedgerConfig.java    | 25 ++++++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 19 +++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 30 ++++++++++++--
 .../broker/service/persistent/PersistentTopic.java | 20 +++++----
 .../broker/service/PersistentTopicE2ETest.java     | 47 ++++++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 18 ++++++---
 site/_data/cli/pulsar-admin.yaml                   |  8 ++--
 7 files changed, 139 insertions(+), 28 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 6f9847b..fd81ba1 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -319,6 +319,16 @@ public class ManagedLedgerConfig {
     }
 
     /**
+     * Set the retention time for the ManagedLedger
+     * <p>
+     * Retention time will prevent data from being deleted for at least the 
specified amount of time, even if no cursors
+     * are created, or if all the cursors have marked the data for deletion.
+     * <p>
+     * A retention time of 0 (the default), will to have no time based 
retention.
+     * <p>
+     * Specifying a negative retention time will make the data to be retained 
indefinitely, based on the
+     * {@link #setRetentionSizeInMB(long)} value.
+     *
      * @param retentionTime
      *            duration for which messages should be retained
      * @param unit
@@ -338,6 +348,15 @@ public class ManagedLedgerConfig {
     }
 
     /**
+     * The retention size is used to set a maximum retention size quota on the 
ManagedLedger.
+     * <p>
+     * This setting works in conjuction with {@link 
#setRetentionSizeInMB(long)} and places a max size for retention,
+     * after which the data is deleted.
+     * <p>
+     * A retention size of 0, will make data to be deleted immediately.
+     * <p>
+     * A retention size of -1, means to have an unlimited retention size.
+     *
      * @param retentionSizeInMB
      *            quota for message retention
      */
@@ -357,7 +376,7 @@ public class ManagedLedgerConfig {
     /**
      * Skip reading non-recoverable/unreadable data-ledger under 
managed-ledger's list. It helps when data-ledgers gets
      * corrupted at bookkeeper and managed-cursor is stuck at that ledger.
-     * 
+     *
      * @param autoSkipNonRecoverableData
      */
     public boolean isAutoSkipNonRecoverableData() {
@@ -384,10 +403,10 @@ public class ManagedLedgerConfig {
         this.maxUnackedRangesToPersist = maxUnackedRangesToPersist;
         return this;
     }
-    
+
     /**
      * @return max unacked message ranges up to which it can store in Zookeeper
-     * 
+     *
      */
     public int getMaxUnackedRangesToPersistInZk() {
         return maxUnackedRangesToPersistInZk;
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 767d3dc..9bbea72 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -161,7 +161,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
     final static long WaitTimeAfterLedgerCreationFailureMs = 10000;
 
     volatile PositionImpl lastConfirmedEntry;
-    
+
     protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
     protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
 
@@ -1485,10 +1485,21 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
     }
 
     private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
+        if (config.getRetentionTimeMillis() < 0) {
+            // Negative retention time equates to infinite retention
+            return false;
+        }
+
         long elapsedMs = System.currentTimeMillis() - ledgerTimestamp;
         return elapsedMs > config.getRetentionTimeMillis();
     }
 
+    private boolean isLedgerRetentionOverSizeQuota() {
+        // Handle the -1 size limit as "infinite" size quota
+        return config.getRetentionSizeInMB() > 0
+                && TOTAL_SIZE_UPDATER.get(this) > ((long) 
config.getRetentionSizeInMB()) * 1024 * 1024;
+    }
+
     /**
      * Checks whether there are ledger that have been fully consumed and 
deletes them
      *
@@ -1537,7 +1548,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             // skip ledger if retention constraint met
             for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, 
false).values()) {
                 boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
-                boolean overRetentionQuota = TOTAL_SIZE_UPDATER.get(this) > 
((long) config.getRetentionSizeInMB()) * 1024 * 1024;
+                boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();
 
                 if (log.isDebugEnabled()) {
                     log.debug(
@@ -1714,7 +1725,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             }
         }, null);
     }
-    
+
     private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
         List<LedgerInfo> ledgers = 
Lists.newArrayList(ManagedLedgerImpl.this.ledgers.values());
         AtomicInteger ledgersToDelete = new AtomicInteger(ledgers.size());
@@ -2199,7 +2210,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             return new 
ManagedLedgerException(BKException.getMessage(bkErrorCode));
         }
     }
-    
+
     private static final Logger log = 
LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
 }
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 821061a..2dcf8cd 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1668,11 +1668,35 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
         c1.skipEntries(1, IndividualDeletedEntries.Exclude);
         // let retention expire
         Thread.sleep(1000);
-        ml.close();
-        // sleep for trim
-        Thread.sleep(100);
+        ml.internalTrimConsumedLedgers();
+
         assertTrue(ml.getLedgersInfoAsList().size() <= 1);
         assertTrue(ml.getTotalSize() <= "shortmessage".getBytes().length);
+        ml.close();
+    }
+
+    @Test
+    public void testInfiniteRetention() throws Exception {
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, 
bkc.getZkHandle());
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(-1);
+        config.setRetentionTime(-1, TimeUnit.HOURS);
+        config.setMaxEntriesPerLedger(1);
+
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
factory.open("retention_test_ledger", config);
+        ManagedCursor c1 = ml.openCursor("c1");
+        ml.addEntry("iamaverylongmessagethatshouldberetained".getBytes());
+        c1.skipEntries(1, IndividualDeletedEntries.Exclude);
+        ml.close();
+
+        // reopen ml
+        ml = (ManagedLedgerImpl) factory.open("retention_test_ledger", config);
+        c1 = ml.openCursor("c1");
+        ml.addEntry("shortmessage".getBytes());
+        c1.skipEntries(1, IndividualDeletedEntries.Exclude);
+        ml.close();
+        assertTrue(ml.getLedgersInfoAsList().size() > 1);
+        assertTrue(ml.getTotalSize() > "shortmessage".getBytes().length);
     }
 
     @Test
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7255bef..d38fa3b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -22,10 +22,11 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
-import java.time.Instant;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.util.*;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -86,7 +87,6 @@ import 
org.apache.pulsar.common.policies.data.PersistentTopicStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublisherStats;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
@@ -1323,9 +1323,13 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
             Optional<Policies> policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
                     .get(AdminResource.path(POLICIES, name.getNamespace()));
             // If no policies, the default is to have no retention and delete 
the inactive topic
-            return policies.map(p -> p.retention_policies)
-                    .map(rp -> System.nanoTime() - lastActive < 
TimeUnit.MINUTES.toNanos(rp.getRetentionTimeInMinutes()))
-                    .orElse(false).booleanValue();
+            return policies.map(p -> p.retention_policies).map(rp -> {
+                long retentionTime = 
TimeUnit.MINUTES.toNanos(rp.getRetentionTimeInMinutes());
+
+                // Negative retention time means the topic should be retained 
indefinitely,
+                // because its own data has to be retained
+                return retentionTime < 0 || (System.nanoTime() - lastActive) < 
retentionTime;
+            }).orElse(false).booleanValue();
         } catch (Exception e) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Error getting policies", topic);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 9086203..93ce4b5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -642,6 +642,53 @@ public class PersistentTopicE2ETest extends BrokerTestBase 
{
         assertNull(pulsar.getBrokerService().getTopicReference(topicName));
     }
 
+    /**
+     * A topic that has retention policy set to -1, should not be GCed
+     * until it has been inactive for at least the retention time and the data
+     * should never be deleted
+     */
+    @Test
+    public void testInfiniteRetentionPolicy() throws Exception {
+        // Retain data forever
+        admin.namespaces().setRetention("prop/use/ns-abc", new 
RetentionPolicies(-1, -1));
+
+        // 1. Simple successful GC
+        String topicName = "persistent://prop/use/ns-abc/topic-10";
+        Producer producer = pulsarClient.createProducer(topicName);
+        producer.close();
+
+        assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
+        runGC();
+        // Should not have been deleted, since we have retention
+        assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
+
+
+        // Remove retention
+        admin.namespaces().setRetention("prop/use/ns-abc", new 
RetentionPolicies(0, 10));
+        Thread.sleep(300);
+
+        // 2. Topic is not GCed with live connection
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setSubscriptionType(SubscriptionType.Exclusive);
+        String subName = "sub1";
+        Consumer consumer = pulsarClient.subscribe(topicName, subName, conf);
+
+        runGC();
+        assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
+
+        // 3. Topic with subscription is not GCed even with no connections
+        consumer.close();
+
+        runGC();
+        assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
+
+        // 4. Topic can be GCed after unsubscribe
+        admin.persistentTopics().deleteSubscription(topicName, subName);
+
+        runGC();
+        assertNull(pulsar.getBrokerService().getTopicReference(topicName));
+    }
+
     @Test
     public void testMessageExpiry() throws Exception {
         int messageTTLSecs = 1;
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index e04eef7..6e84bf6 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -232,7 +232,7 @@ public class CmdNamespaces extends CmdBase {
     private class GetAntiAffinityGroup extends CliCommand {
         @Parameter(description = "property/cluster/namespace\n", required = 
true)
         private java.util.List<String> params;
-        
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
@@ -263,14 +263,14 @@ public class CmdNamespaces extends CmdBase {
     private class DeleteAntiAffinityGroup extends CliCommand {
         @Parameter(description = "property/cluster/namespace\n", required = 
true)
         private java.util.List<String> params;
-        
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
             admin.namespaces().deleteNamespaceAntiAffinityGroup(namespace);
         }
     }
-    
+
 
     @Parameters(commandDescription = "Enable or disable deduplication for a 
namespace")
     private class SetDeduplication extends CliCommand {
@@ -300,10 +300,12 @@ public class CmdNamespaces extends CmdBase {
         private java.util.List<String> params;
 
         @Parameter(names = { "--time",
-                "-t" }, description = "Retention time in minutes (or minutes, 
hours,days,weeks eg: 100m, 3h, 2d, 5w)", required = true)
+                "-t" }, description = "Retention time in minutes (or minutes, 
hours,days,weeks eg: 100m, 3h, 2d, 5w). "
+                        + "0 means no retention and -1 means infinite time 
retention", required = true)
         private String retentionTimeStr;
 
-        @Parameter(names = { "--size", "-s" }, description = "Retention size 
limit (eg: 10M, 16G)", required = true)
+        @Parameter(names = { "--size", "-s" }, description = "Retention size 
limit (eg: 10M, 16G, 3T). "
+                + "0 means no retention and -1 means infinite size retention", 
required = true)
         private String limitStr;
 
         @Override
@@ -625,6 +627,10 @@ public class CmdNamespaces extends CmdBase {
         case 'G':
             return Long.parseLong(subStr) * 1024 * 1024 * 1024;
 
+        case 't':
+        case 'T':
+            return Long.parseLong(subStr) * 1024 * 1024 * 1024 * 1024;
+
         default:
             return Long.parseLong(s);
         }
@@ -680,7 +686,7 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("get-message-ttl", new GetMessageTTL());
         jcommander.addCommand("set-message-ttl", new SetMessageTTL());
-        
+
         jcommander.addCommand("get-anti-affinity-group", new 
GetAntiAffinityGroup());
         jcommander.addCommand("set-anti-affinity-group", new 
SetAntiAffinityGroup());
         jcommander.addCommand("get-anti-affinity-namespaces", new 
GetAntiAffinityNamespaces());
diff --git a/site/_data/cli/pulsar-admin.yaml b/site/_data/cli/pulsar-admin.yaml
index 7ac972f..eb7c805 100644
--- a/site/_data/cli/pulsar-admin.yaml
+++ b/site/_data/cli/pulsar-admin.yaml
@@ -213,9 +213,9 @@ commands:
     argument: property/cluster/namespace
     options:
     - flags: -s, --size
-      description: The retention size limits (for example `10M` or `16G`)
+      description: The retention size limits (for example `10M`, `16G` or 
`3T`). 0 means no retention and -1 means infinite size retention
     - flags: -t, --time
-      description: "The retention time in minutes, hours, days, or weeks. 
Examples: `100m`, `13h`, `2d`, `5w`."
+      description: "The retention time in minutes, hours, days, or weeks. 
Examples: `100m`, `13h`, `2d`, `5w`. 0 means no retention and -1 means infinite 
time retention"
   - name: unload
     description: Unload a namespace or namespace bundle from the current 
serving broker.
     argument: property/cluster/namespace
@@ -295,14 +295,14 @@ commands:
     description: Look up a topic from the current serving broker
     argument: persistent://property/cluster/namespace/topic
   - name: bundle-range
-    description: Get the namespace bundle which contains the given topic  
+    description: Get the namespace bundle which contains the given topic
     argument: persistent://property/cluster/namespace/topic
   - name: delete
     description: Delete a topic. The topic cannot be deleted if there are any 
active subscriptions or producers connected to the topic.
     argument: persistent://property/cluster/namespace/topic
   - name: unload
     description: Unload a topic
-    argument: persistent://property/cluster/namespace/topic    
+    argument: persistent://property/cluster/namespace/topic
   - name: subscriptions
     description: Get the list of subscriptions on the topic
     argument: persistent://property/cluster/namespace/topic

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to