This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 762036c Delete inactive subscriptions automatically (#1352) 762036c is described below commit 762036c36f071d0aca148bf1d333b3df0da66191 Author: yush1ga <y.shiga.91+yush...@gmail.com> AuthorDate: Wed May 2 01:36:30 2018 +0900 Delete inactive subscriptions automatically (#1352) * Delete inactive subscriptions automatically * Addressed PR comments * Add subscriptionExpiryCheckIntervalInMinutes * Add lastActive for ManagedCursorInfo --- conf/broker.conf | 7 + conf/standalone.conf | 7 + .../apache/bookkeeper/mledger/ManagedCursor.java | 13 ++ .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 26 +++- .../bookkeeper/mledger/proto/MLDataFormats.java | 65 +++++++- managed-ledger/src/main/proto/MLDataFormats.proto | 2 + .../mledger/impl/ManagedCursorContainerTest.java | 10 ++ .../apache/pulsar/broker/ServiceConfiguration.java | 21 +++ .../broker/admin/v1/NonPersistentTopics.java | 10 +- .../broker/admin/v2/NonPersistentTopics.java | 10 +- .../pulsar/broker/service/BrokerService.java | 166 +++++++++------------ .../org/apache/pulsar/broker/service/Topic.java | 2 + .../service/nonpersistent/NonPersistentTopic.java | 5 + .../service/persistent/PersistentSubscription.java | 2 + .../broker/service/persistent/PersistentTopic.java | 13 ++ .../broker/stats/BookieClientStatsGenerator.java | 15 +- 16 files changed, 252 insertions(+), 122 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index dc7ca04..893a3e2 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -76,6 +76,13 @@ messageExpiryCheckIntervalInMinutes=5 # How long to delay rewinding cursor and dispatching messages when active consumer is changed activeConsumerFailoverDelayTimeMillis=1000 +# How long to delete inactive subscriptions from last consuming +# When it is 0, inactive subscriptions are not deleted automatically +subscriptionExpirationTimeMinutes=0 + +# How frequently to proactively check and purge expired subscription +subscriptionExpiryCheckIntervalInMinutes=5 + # Set the default behavior for message deduplication in the broker # This can be overridden per-namespace. If enabled, broker will reject # messages that were already stored in the topic diff --git a/conf/standalone.conf b/conf/standalone.conf index f5c9546..bc6dc10 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -69,6 +69,13 @@ messageExpiryCheckIntervalInMinutes=5 # How long to delay rewinding cursor and dispatching messages when active consumer is changed activeConsumerFailoverDelayTimeMillis=1000 +# How long to delete inactive subscriptions from last consuming +# When it is 0, inactive subscriptions are not deleted automatically +subscriptionExpirationTimeMinutes=0 + +# How frequently to proactively check and purge expired subscription +subscriptionExpiryCheckIntervalInMinutes=5 + # Set the default behavior for message deduplication in the broker # This can be overridden per-namespace. If enabled, broker will reject # messages that were already stored in the topic diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index f6793b4..186a450 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -58,6 +58,19 @@ public interface ManagedCursor { String getName(); /** + * Get the last active time of the cursor. + * + * @return the last active time of the cursor + */ + long getLastActive(); + + /** + * Update the last active time of the cursor + * + */ + void updateLastActive(); + + /** * Return any properties that were associated with the last stored position. */ Map<String, Long> getProperties(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 695f22d..194e8c0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -158,6 +158,9 @@ public class ManagedCursorImpl implements ManagedCursor { private long lastLedgerSwitchTimestamp; private final Clock clock; + // The last active time (Unix time, milliseconds) of the cursor + private long lastActive; + enum State { Uninitialized, // Cursor is being initialized NoLedger, // There is no metadata ledger open for writing @@ -189,6 +192,7 @@ public class ManagedCursorImpl implements ManagedCursor { RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE); WAITING_READ_OP_UPDATER.set(this, null); this.clock = config.getClock(); + this.lastActive = this.clock.millis(); this.lastLedgerSwitchTimestamp = this.clock.millis(); if (config.getThrottleMarkDelete() > 0.0) { @@ -216,6 +220,7 @@ public class ManagedCursorImpl implements ManagedCursor { public void operationComplete(ManagedCursorInfo info, Stat stat) { cursorLedgerStat = stat; + lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive; if (info.getCursorsLedgerId() == -1L) { // There is no cursor ledger to read the last position from. It means the cursor has been properly @@ -1280,7 +1285,7 @@ public class ManagedCursorImpl implements ManagedCursor { // markDelete-position and clear out deletedMsgSet markDeletePosition = PositionImpl.get(newMarkDeletePosition); individualDeletedMessages.remove(Range.atMost(markDeletePosition)); - + if (readPosition.compareTo(newMarkDeletePosition) <= 0) { // If the position that is mark-deleted is past the read position, it // means that the client has skipped some entries. We need to move @@ -1307,7 +1312,7 @@ public class ManagedCursorImpl implements ManagedCursor { final MarkDeleteCallback callback, final Object ctx) { checkNotNull(position); checkArgument(position instanceof PositionImpl); - + if (STATE_UPDATER.get(this) == State.Closed) { callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx); return; @@ -1328,7 +1333,7 @@ public class ManagedCursorImpl implements ManagedCursor { log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position); } PositionImpl newPosition = (PositionImpl) position; - + if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) { if (log.isDebugEnabled()) { log.debug( @@ -1541,7 +1546,7 @@ public class ManagedCursorImpl implements ManagedCursor { for (Position pos : positions) { PositionImpl position = (PositionImpl) checkNotNull(pos); - + if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) { if (log.isDebugEnabled()) { log.debug( @@ -1693,6 +1698,16 @@ public class ManagedCursorImpl implements ManagedCursor { } @Override + public long getLastActive() { + return lastActive; + } + + @Override + public void updateLastActive() { + lastActive = System.currentTimeMillis(); + } + + @Override public boolean isDurable() { return true; } @@ -1837,7 +1852,8 @@ public class ManagedCursorImpl implements ManagedCursor { ManagedCursorInfo.Builder info = ManagedCursorInfo.newBuilder() // .setCursorsLedgerId(cursorsLedgerId) // .setMarkDeleteLedgerId(position.getLedgerId()) // - .setMarkDeleteEntryId(position.getEntryId()); // + .setMarkDeleteEntryId(position.getEntryId()) // + .setLastActive(lastActive); // info.addAllProperties(buildPropertiesMap(properties)); if (persistIndividualDeletedMessageRanges) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java index 73ba1da..fc607b9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java @@ -4428,6 +4428,10 @@ public final class MLDataFormats { getPropertiesOrBuilderList(); org.apache.bookkeeper.mledger.proto.MLDataFormats.LongPropertyOrBuilder getPropertiesOrBuilder( int index); + + // optional int64 lastActive = 6; + boolean hasLastActive(); + long getLastActive(); } public static final class ManagedCursorInfo extends com.google.protobuf.GeneratedMessage @@ -4530,12 +4534,23 @@ public final class MLDataFormats { return properties_.get(index); } + // optional int64 lastActive = 6; + public static final int LASTACTIVE_FIELD_NUMBER = 6; + private long lastActive_; + public boolean hasLastActive() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public long getLastActive() { + return lastActive_; + } + private void initFields() { cursorsLedgerId_ = 0L; markDeleteLedgerId_ = 0L; markDeleteEntryId_ = 0L; individualDeletedMessages_ = java.util.Collections.emptyList(); properties_ = java.util.Collections.emptyList(); + lastActive_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4580,6 +4595,9 @@ public final class MLDataFormats { for (int i = 0; i < properties_.size(); i++) { output.writeMessage(5, properties_.get(i)); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeInt64(6, lastActive_); + } getUnknownFields().writeTo(output); } @@ -4609,6 +4627,10 @@ public final class MLDataFormats { size += com.google.protobuf.CodedOutputStream .computeMessageSize(5, properties_.get(i)); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(6, lastActive_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4753,6 +4775,8 @@ public final class MLDataFormats { } else { propertiesBuilder_.clear(); } + lastActive_ = 0L; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -4821,6 +4845,10 @@ public final class MLDataFormats { } else { result.properties_ = propertiesBuilder_.build(); } + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000008; + } + result.lastActive_ = lastActive_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -4898,6 +4926,9 @@ public final class MLDataFormats { } } } + if (other.hasLastActive()) { + setLastActive(other.getLastActive()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -4972,6 +5003,11 @@ public final class MLDataFormats { addProperties(subBuilder.buildPartial()); break; } + case 48: { + bitField0_ |= 0x00000020; + lastActive_ = input.readInt64(); + break; + } } } } @@ -5413,6 +5449,27 @@ public final class MLDataFormats { return propertiesBuilder_; } + // optional int64 lastActive = 6; + private long lastActive_ ; + public boolean hasLastActive() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + public long getLastActive() { + return lastActive_; + } + public Builder setLastActive(long value) { + bitField0_ |= 0x00000020; + lastActive_ = value; + onChanged(); + return this; + } + public Builder clearLastActive() { + bitField0_ = (bitField0_ & ~0x00000020); + lastActive_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:ManagedCursorInfo) } @@ -5491,13 +5548,13 @@ public final class MLDataFormats { "Range\022*\n\rlowerEndpoint\030\001 \002(\0132\023.NestedPos" + "itionInfo\022*\n\rupperEndpoint\030\002 \002(\0132\023.Neste" + "dPositionInfo\"+\n\014LongProperty\022\014\n\004name\030\001 " + - "\002(\t\022\r\n\005value\030\002 \002(\003\"\270\001\n\021ManagedCursorInfo" + + "\002(\t\022\r\n\005value\030\002 \002(\003\"\314\001\n\021ManagedCursorInfo" + "\022\027\n\017cursorsLedgerId\030\001 \002(\003\022\032\n\022markDeleteL", "edgerId\030\002 \001(\003\022\031\n\021markDeleteEntryId\030\003 \001(\003" + "\0220\n\031individualDeletedMessages\030\004 \003(\0132\r.Me" + "ssageRange\022!\n\nproperties\030\005 \003(\0132\r.LongPro" + - "pertyB\'\n#org.apache.bookkeeper.mledger.p" + - "rotoH\001" + "perty\022\022\n\nlastActive\030\006 \001(\003B\'\n#org.apache." + + "bookkeeper.mledger.protoH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5565,7 +5622,7 @@ public final class MLDataFormats { internal_static_ManagedCursorInfo_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ManagedCursorInfo_descriptor, - new java.lang.String[] { "CursorsLedgerId", "MarkDeleteLedgerId", "MarkDeleteEntryId", "IndividualDeletedMessages", "Properties", }, + new java.lang.String[] { "CursorsLedgerId", "MarkDeleteLedgerId", "MarkDeleteEntryId", "IndividualDeletedMessages", "Properties", "LastActive", }, org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo.class, org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo.Builder.class); return null; diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index 29f465c..0d5ad3a 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -86,4 +86,6 @@ message ManagedCursorInfo { // Additional custom properties associated with // the current cursor position repeated LongProperty properties = 5; + + optional int64 lastActive = 6; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index de98b60..c9021ae 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -122,6 +122,16 @@ public class ManagedCursorContainerTest { return name; } + @Override + public long getLastActive() { + return System.currentTimeMillis(); + } + + @Override + public void updateLastActive() { + // no-op + } + public String toString() { return String.format("%s=%s", name, position); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 5cf31c1..9a69fc3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -93,6 +93,11 @@ public class ServiceConfiguration implements PulsarConfiguration { private int messageExpiryCheckIntervalInMinutes = 5; // How long to delay rewinding cursor and dispatching messages when active consumer is changed private int activeConsumerFailoverDelayTimeMillis = 1000; + // How long to delete inactive subscriptions from last consuming + // When it is 0, inactive subscriptions are not deleted automatically + private long subscriptionExpirationTimeMinutes = 0; + // How frequently to proactively check and purge expired subscription + private long subscriptionExpiryCheckIntervalInMinutes = 5; // Set the default behavior for message deduplication in the broker // This can be overridden per-namespace. If enabled, broker will reject @@ -681,6 +686,22 @@ public class ServiceConfiguration implements PulsarConfiguration { this.activeConsumerFailoverDelayTimeMillis = activeConsumerFailoverDelayTimeMillis; } + public long getSubscriptionExpirationTimeMinutes() { + return subscriptionExpirationTimeMinutes; + } + + public void setSubscriptionExpirationTimeMinutes(long subscriptionExpirationTimeMinutes) { + this.subscriptionExpirationTimeMinutes = subscriptionExpirationTimeMinutes; + } + + public long getSubscriptionExpiryCheckIntervalInMinutes() { + return subscriptionExpiryCheckIntervalInMinutes; + } + + public void setSubscriptionExpiryCheckIntervalInMinutes(long subscriptionExpiryCheckIntervalInMinutes) { + this.subscriptionExpiryCheckIntervalInMinutes = subscriptionExpiryCheckIntervalInMinutes; + } + public boolean isClientLibraryVersionCheckEnabled() { return clientLibraryVersionCheckEnabled; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java index e9fbe0a..f73a829 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java @@ -236,12 +236,10 @@ public class NonPersistentTopics extends PersistentTopics { NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange, true, true); try { final List<String> topicList = Lists.newArrayList(); - pulsar().getBrokerService().getTopics().forEach((name, topicFuture) -> { - if (BrokerService.extractTopic(topicFuture).isPresent()) { - TopicName topicName = TopicName.get(name); - if (nsBundle.includes(topicName)) { - topicList.add(name); - } + pulsar().getBrokerService().forEachTopic(topic -> { + TopicName topicName = TopicName.get(topic.getName()); + if (nsBundle.includes(topicName)) { + topicList.add(topic.getName()); } }); return topicList; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 8f616d2..9d49ad1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -227,12 +227,10 @@ public class NonPersistentTopics extends PersistentTopics { NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, true, true); try { final List<String> topicList = Lists.newArrayList(); - pulsar().getBrokerService().getTopics().forEach((name, topicFuture) -> { - if (BrokerService.extractTopic(topicFuture).isPresent()) { - TopicName topicName = TopicName.get(name); - if (nsBundle.includes(topicName)) { - topicList.add(name); - } + pulsar().getBrokerService().forEachTopic(topic -> { + TopicName topicName = TopicName.get(topic.getName()); + if (nsBundle.includes(topicName)) { + topicList.add(topic.getName()); } }); return topicList; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ad709db..43f5000 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -109,9 +109,9 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; -import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FieldParser; import org.apache.pulsar.common.util.FutureUtil; @@ -324,10 +324,18 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies } // Deduplication info checker - long intervalInSeconds = TimeUnit.MINUTES + long duplicationCheckerIntervalInSeconds = TimeUnit.MINUTES .toSeconds(pulsar().getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes()) / 3; - inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkMessageDeduplicationInfo), intervalInSeconds, - intervalInSeconds, TimeUnit.SECONDS); + inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkMessageDeduplicationInfo), duplicationCheckerIntervalInSeconds, + duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS); + + // Inactive subscriber checker + if (pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes() > 0) { + long subscriptionExpiryCheckIntervalInSeconds = + TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes()); + inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkInactiveSubscriptions), + subscriptionExpiryCheckIntervalInSeconds, subscriptionExpiryCheckIntervalInSeconds, TimeUnit.SECONDS); + } } void startMessageExpiryMonitor() { @@ -813,28 +821,29 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies } public void checkGC(int gcIntervalInSeconds) { - topics.forEach((n, t) -> { - Optional<Topic> topic = extractTopic(t); - if (topic.isPresent()) { - topic.get().checkGC(gcIntervalInSeconds); - } - }); + forEachTopic(topic -> topic.checkGC(gcIntervalInSeconds)); } public void checkMessageExpiry() { - topics.forEach((n, t) -> { - Optional<Topic> topic = extractTopic(t); - if (topic.isPresent()) { - topic.get().checkMessageExpiry(); - } - }); + forEachTopic(Topic::checkMessageExpiry); } public void checkMessageDeduplicationInfo() { + forEachTopic(Topic::checkMessageDeduplicationInfo); + } + + public void checkInactiveSubscriptions() { + forEachTopic(Topic::checkInactiveSubscriptions); + } + + /** + * Iterates over all loaded topics in the broker + */ + public void forEachTopic(Consumer<Topic> consumer) { topics.forEach((n, t) -> { Optional<Topic> topic = extractTopic(t); if (topic.isPresent()) { - topic.get().checkMessageDeduplicationInfo(); + consumer.accept(topic.get()); } }); } @@ -866,28 +875,18 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies } public void monitorBacklogQuota() { - topics.forEach((n, t) -> { - try { - Optional<Topic> optionalTopic = extractTopic(t); - if (optionalTopic.isPresent() && optionalTopic.get() instanceof PersistentTopic) { - PersistentTopic topic = (PersistentTopic) optionalTopic.get(); - if (isBacklogExceeded(topic)) { - getBacklogQuotaManager().handleExceededBacklogQuota(topic); - } else if (topic == null) { - if (log.isDebugEnabled()) { - log.debug("topic is null "); - } - } else { - if (log.isDebugEnabled()) { - log.debug("quota not exceeded for [{}]", topic.getName()); - } + forEachTopic(topic -> { + if (topic instanceof PersistentTopic) { + PersistentTopic persistentTopic = (PersistentTopic) topic; + if (isBacklogExceeded(persistentTopic)) { + getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic); + } else { + if (log.isDebugEnabled()) { + log.debug("quota not exceeded for [{}]", topic.getName()); } } - } catch (Exception xle) { - log.warn("Backlog quota monitoring encountered :" + xle.getLocalizedMessage()); } }); - } void checkTopicNsOwnership(final String topic) throws RuntimeException { @@ -1035,12 +1034,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies public Map<String, TopicStats> getTopicStats() { HashMap<String, TopicStats> stats = new HashMap<>(); - topics.forEach((name, topicFuture) -> { - Optional<Topic> topic = extractTopic(topicFuture); - if (topic.isPresent()) { - stats.put(name, topic.get().getStats()); - } - }); + + forEachTopic(topic -> stats.put(topic.getName(), topic.getStats())); + return stats; } @@ -1130,11 +1126,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies private void updateTopicMessageDispatchRate() { this.pulsar().getExecutor().execute(() -> { // update message-rate for each topic - topics.forEach((name, topicFuture) -> { - Optional<Topic> topic = extractTopic(topicFuture); - - if (topic.isPresent() && topic.get() instanceof PersistentTopic) { - PersistentTopic persistentTopic = (PersistentTopic) topic.get(); + forEachTopic(topic -> { + if (topic instanceof PersistentTopic) { + PersistentTopic persistentTopic = (PersistentTopic) topic; // it first checks namespace-policy rate and if not present then applies broker-config persistentTopic.getDispatchRateLimiter().updateDispatchRate(); } @@ -1145,21 +1139,17 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies private void updateSubscriptionMessageDispatchRate() { this.pulsar().getExecutor().submit(() -> { // update message-rate for each topic subscription - topics.forEach((name, topicFuture) -> { - Optional<Topic> topic = extractTopic(topicFuture); - - if (topic.isPresent()) { - topic.get().getSubscriptions().forEach((subName, persistentSubscription) -> { - if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) { - ((PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher()) - .getDispatchRateLimiter().updateDispatchRate(); - } else if (persistentSubscription - .getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) { - ((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher()) - .getDispatchRateLimiter().updateDispatchRate(); - } - }); - } + forEachTopic(topic -> { + topic.getSubscriptions().forEach((subName, persistentSubscription) -> { + if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) { + ((PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher()) + .getDispatchRateLimiter().updateDispatchRate(); + } else if (persistentSubscription + .getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) { + ((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher()) + .getDispatchRateLimiter().updateDispatchRate(); + } + }); }); }); } @@ -1167,22 +1157,17 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies private void updateManagedLedgerConfig() { this.pulsar().getExecutor().execute(() -> { // update managed-ledger config of each topic - topics.forEach((name, topicFuture) -> { - if (topicFuture.isDone()) { - String topicName = null; - try { - Optional<Topic> topic = extractTopic(topicFuture); - - if (topic.isPresent() && topic.get() instanceof PersistentTopic) { - PersistentTopic persistentTopic = (PersistentTopic) topic.get(); - topicName = persistentTopic.getName(); - // update skipNonRecoverableLedger configuration - persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData( - pulsar.getConfiguration().isAutoSkipNonRecoverableData()); - } - } catch (Exception e) { - log.warn("[{}] failed to update managed-ledger config", topicName, e); + + forEachTopic(topic -> { + try { + if (topic instanceof PersistentTopic) { + PersistentTopic persistentTopic = (PersistentTopic) topic; + // update skipNonRecoverableLedger configuration + persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData( + pulsar.getConfiguration().isAutoSkipNonRecoverableData()); } + } catch (Exception e) { + log.warn("[{}] failed to update managed-ledger config", topic.getName(), e); } }); }); @@ -1437,23 +1422,20 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies private void blockDispatchersWithLargeUnAckMessages() { lock.readLock().lock(); try { - topics.forEach((name, topicFuture) -> { - Optional<Topic> topic = extractTopic(topicFuture); - if (topic.isPresent()) { - topic.get().getSubscriptions().forEach((subName, persistentSubscription) -> { - if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) { - PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription - .getDispatcher(); - int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages(); - if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) { - log.info("[{}] Blocking dispatcher due to reached max broker limit {}", - dispatcher.getName(), dispatcher.getTotalUnackedMessages()); - dispatcher.blockDispatcherOnUnackedMsgs(); - blockedDispatchers.add(dispatcher); - } + forEachTopic(topic -> { + topic.getSubscriptions().forEach((subName, persistentSubscription) -> { + if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) { + PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription + .getDispatcher(); + int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages(); + if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) { + log.info("[{}] Blocking dispatcher due to reached max broker limit {}", + dispatcher.getName(), dispatcher.getTotalUnackedMessages()); + dispatcher.blockDispatcherOnUnackedMsgs(); + blockedDispatchers.add(dispatcher); } - }); - } + } + }); }); } finally { lock.readLock().unlock(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index fbf3c65..fccc75b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -101,6 +101,8 @@ public interface Topic { void checkGC(int gcInterval); + void checkInactiveSubscriptions(); + void checkMessageExpiry(); void checkMessageDeduplicationInfo(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index e45016c..cddfea3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -913,6 +913,11 @@ public class NonPersistentTopic implements Topic { } @Override + public void checkInactiveSubscriptions() { + // no-op + } + + @Override public CompletableFuture<Void> onPoliciesUpdate(Policies data) { if (log.isDebugEnabled()) { log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 65f5f97..bdace55 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -96,6 +96,7 @@ public class PersistentSubscription implements Subscription { @Override public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { + cursor.updateLastActive(); if (IS_FENCED_UPDATER.get(this) == TRUE) { log.warn("Attempting to add consumer {} on a fenced subscription", consumer); throw new SubscriptionFencedException("Subscription is fenced"); @@ -144,6 +145,7 @@ public class PersistentSubscription implements Subscription { @Override public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { + cursor.updateLastActive(); if (dispatcher != null) { dispatcher.removeConsumer(consumer); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9c59bd9..d56bf55 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1462,6 +1462,19 @@ public class PersistentTopic implements Topic, AddEntryCallback { } } + @Override + public void checkInactiveSubscriptions() { + final long expirationTime = TimeUnit.MINUTES.toMillis(brokerService.pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes()); + if (expirationTime <= 0) return; + subscriptions.forEach((subName, sub) -> { + if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()) return; + if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTime) { + sub.delete().thenAccept( + v -> log.info("[{}][{}] The subscription was deleted due to expiration", topic, subName)); + } + }); + } + /** * Check whether the topic should be retained (based on time), even tough there are no producers/consumers and it's * marked as inactive. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java index 914a25c..697f16a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BookieClientStatsGenerator.java @@ -18,18 +18,15 @@ */ package org.apache.pulsar.broker.stats; +import com.google.common.collect.Maps; + import java.util.Map; -import java.util.Optional; import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.naming.TopicName; -import com.google.common.collect.Maps; - /** */ public class BookieClientStatsGenerator { @@ -49,10 +46,10 @@ public class BookieClientStatsGenerator { private Map<String, Map<String, PendingBookieOpsStats>> generate() throws Exception { if (pulsar.getBrokerService() != null && pulsar.getBrokerService().getTopics() != null) { - pulsar.getBrokerService().getTopics().forEach((name, topicFuture) -> { - Optional<Topic> topic = BrokerService.extractTopic(topicFuture); - if (topic.isPresent() && topic.get() instanceof PersistentTopic) { - PersistentTopic persistentTopic = (PersistentTopic) topic.get(); + + pulsar.getBrokerService().forEachTopic(topic -> { + if (topic instanceof PersistentTopic) { + PersistentTopic persistentTopic = (PersistentTopic) topic; TopicName topicName = TopicName.get(persistentTopic.getName()); put(topicName, persistentTopic.getManagedLedger().getStats().getPendingBookieOpsStats()); } -- To stop receiving notification emails like this one, please contact mme...@apache.org.