This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 3c9e281 Add non-persistent topic stats separately in brokers-stat (#1235) 3c9e281 is described below commit 3c9e28172d486bfb4c39db5248e1033d5079c48f Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Wed Feb 14 12:03:38 2018 -0800 Add non-persistent topic stats separately in brokers-stat (#1235) --- .../apache/pulsar/broker/service/PulsarStats.java | 45 +++++++++++++++++----- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java index cdc5ada..835cabc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java @@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.BrokerOperabilityMetrics; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; @@ -55,6 +56,7 @@ public class PulsarStats implements Closeable { private Map<String, NamespaceBundleStats> bundleStats; private List<Metrics> tempMetricsCollection; private List<Metrics> metricsCollection; + private List<NonPersistentTopic> tempNonPersistentTopics; private final BrokerOperabilityMetrics brokerOperabilityMetrics; private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock(); @@ -71,6 +73,7 @@ public class PulsarStats implements Closeable { this.metricsCollection = Lists.newArrayList(); this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(), pulsar.getAdvertisedAddress()); + this.tempNonPersistentTopics = Lists.newArrayList(); } @Override @@ -118,22 +121,46 @@ public class PulsarStats implements Closeable { currentBundleStats.topics = topics.size(); topicStatsStream.startObject(NamespaceBundle.getBundleRange(bundle)); + + tempNonPersistentTopics.clear(); + // start persistent topic topicStatsStream.startObject("persistent"); topics.forEach((name, topic) -> { - try { - topic.updateRates(nsStats, currentBundleStats, topicStatsStream, - clusterReplicationMetrics, namespaceName); - } catch (Exception e) { - log.error("Failed to generate topic stats for topic {}: {}", name, e.getMessage(), e); - } - // this task: helps to activate inactive-backlog-cursors which have caught up and - // connected, also deactivate active-backlog-cursors which has backlog if (topic instanceof PersistentTopic) { + try { + topic.updateRates(nsStats, currentBundleStats, topicStatsStream, + clusterReplicationMetrics, namespaceName); + } catch (Exception e) { + log.error("Failed to generate topic stats for topic {}: {}", name, e.getMessage(), e); + } + // this task: helps to activate inactive-backlog-cursors which have caught up and + // connected, also deactivate active-backlog-cursors which has backlog ((PersistentTopic) topic).getManagedLedger().checkBackloggedCursors(); + }else if (topic instanceof NonPersistentTopic) { + tempNonPersistentTopics.add((NonPersistentTopic) topic); + } else { + log.warn("Unsupported type of topic {}", topic.getClass().getName()); } }); - + // end persistent topics section topicStatsStream.endObject(); + + if(!tempNonPersistentTopics.isEmpty()) { + // start non-persistent topic + topicStatsStream.startObject("non-persistent"); + tempNonPersistentTopics.forEach(topic -> { + try { + topic.updateRates(nsStats, currentBundleStats, topicStatsStream, + clusterReplicationMetrics, namespaceName); + } catch (Exception e) { + log.error("Failed to generate topic stats for topic {}: {}", topic.getName(), e.getMessage(), e); + } + }); + // end non-persistent topics section + topicStatsStream.endObject(); + } + + // end namespace-bundle section topicStatsStream.endObject(); }); -- To stop receiving notification emails like this one, please contact rdhaba...@apache.org.