This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c5f2f6  Add backlog quota retention policy to server config. (#2460)
6c5f2f6 is described below

commit 6c5f2f63defe077a59003f37e06430d243b769dc
Author: penghui <codelipeng...@gmail.com>
AuthorDate: Wed Aug 29 01:39:54 2018 +0800

    Add backlog quota retention policy to server config. (#2460)
    
    * Add backlog quota retention policy to server config.
    
    * Rename backlogQuotaRetentionPolicy to backlogQuotaDefaultRetentionPolicy
    
    * Change backlogQuotaDefaultRetentionPolicy String to enum.
---
 conf/broker.conf                                           |  6 ++++++
 .../org/apache/pulsar/broker/ServiceConfiguration.java     | 14 ++++++++++++++
 .../apache/pulsar/broker/service/BacklogQuotaManager.java  |  2 +-
 3 files changed, 21 insertions(+), 1 deletion(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 5465c9a..a8dd44b 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -67,6 +67,12 @@ backlogQuotaCheckIntervalInSeconds=60
 # Default per-topic backlog quota limit
 backlogQuotaDefaultLimitGB=10
 
+# Default backlog quota retention policy. Default is producer_request_hold
+# 'producer_request_hold' Policy which holds producer's send request until the 
resource becomes available (or holding times out)
+# 'producer_exception' Policy which throws 
javax.jms.ResourceAllocationException to the producer
+# 'consumer_backlog_eviction' Policy which evicts the oldest message from the 
slowest consumer's backlog
+backlogQuotaDefaultRetentionPolicy=producer_request_hold
+
 # Enable the deletion of inactive topics
 brokerDeleteInactiveTopicsEnabled=true
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8ad0fc8..8057540 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
 
 import com.google.common.collect.Sets;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 
 /**
  * Pulsar service configuration object.
@@ -89,6 +90,11 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private int backlogQuotaCheckIntervalInSeconds = 60;
     // Default per-topic backlog quota limit
     private long backlogQuotaDefaultLimitGB = 50;
+    //Default backlog quota retention policy. Default is producer_request_hold
+    //'producer_request_hold' Policy which holds producer's send request until 
the resource becomes available (or holding times out)
+    //'producer_exception' Policy which throws 
javax.jms.ResourceAllocationException to the producer
+    //'consumer_backlog_eviction' Policy which evicts the oldest message from 
the slowest consumer's backlog
+    private BacklogQuota.RetentionPolicy backlogQuotaDefaultRetentionPolicy = 
BacklogQuota.RetentionPolicy.producer_request_hold;
     // Enable the deletion of inactive topics
     private boolean brokerDeleteInactiveTopicsEnabled = true;
     // How often to check for inactive topics
@@ -1729,4 +1735,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     public int getBrokerServiceCompactionMonitorIntervalInSeconds() {
         return this.brokerServiceCompactionMonitorIntervalInSeconds;
     }
+
+    public BacklogQuota.RetentionPolicy 
getBacklogQuotaDefaultRetentionPolicy() {
+        return backlogQuotaDefaultRetentionPolicy;
+    }
+
+    public void 
setBacklogQuotaDefaultRetentionPolicy(BacklogQuota.RetentionPolicy 
backlogQuotaDefaultRetentionPolicy) {
+        this.backlogQuotaDefaultRetentionPolicy = 
backlogQuotaDefaultRetentionPolicy;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index e7e38dd..7d607cd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -50,7 +50,7 @@ public class BacklogQuotaManager {
     public BacklogQuotaManager(PulsarService pulsar) {
         this.defaultQuota = new BacklogQuota(
                 pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() * 
1024 * 1024 * 1024,
-                RetentionPolicy.producer_request_hold);
+                
pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy());
         this.zkCache = pulsar.getConfigurationCache().policiesCache();
     }
 

Reply via email to