rdhabalia closed pull request #1235: Add non-persistent topic stats separately in brokers-stat URL: https://github.com/apache/incubator-pulsar/pull/1235
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java index cdc5ada99..835cabccd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java @@ -25,6 +25,7 @@ import java.util.function.Consumer; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.stats.BrokerOperabilityMetrics; import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; @@ -55,6 +56,7 @@ private Map<String, NamespaceBundleStats> bundleStats; private List<Metrics> tempMetricsCollection; private List<Metrics> metricsCollection; + private List<NonPersistentTopic> tempNonPersistentTopics; private final BrokerOperabilityMetrics brokerOperabilityMetrics; private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock(); @@ -71,6 +73,7 @@ public PulsarStats(PulsarService pulsar) { this.metricsCollection = Lists.newArrayList(); this.brokerOperabilityMetrics = new BrokerOperabilityMetrics(pulsar.getConfiguration().getClusterName(), pulsar.getAdvertisedAddress()); + this.tempNonPersistentTopics = Lists.newArrayList(); } @Override @@ -118,22 +121,46 @@ public synchronized void updateStats( currentBundleStats.topics = topics.size(); topicStatsStream.startObject(NamespaceBundle.getBundleRange(bundle)); + + tempNonPersistentTopics.clear(); + // start persistent topic topicStatsStream.startObject("persistent"); topics.forEach((name, topic) -> { - try { - topic.updateRates(nsStats, currentBundleStats, topicStatsStream, - clusterReplicationMetrics, namespaceName); - } catch (Exception e) { - log.error("Failed to generate topic stats for topic {}: {}", name, e.getMessage(), e); - } - // this task: helps to activate inactive-backlog-cursors which have caught up and - // connected, also deactivate active-backlog-cursors which has backlog if (topic instanceof PersistentTopic) { + try { + topic.updateRates(nsStats, currentBundleStats, topicStatsStream, + clusterReplicationMetrics, namespaceName); + } catch (Exception e) { + log.error("Failed to generate topic stats for topic {}: {}", name, e.getMessage(), e); + } + // this task: helps to activate inactive-backlog-cursors which have caught up and + // connected, also deactivate active-backlog-cursors which has backlog ((PersistentTopic) topic).getManagedLedger().checkBackloggedCursors(); + }else if (topic instanceof NonPersistentTopic) { + tempNonPersistentTopics.add((NonPersistentTopic) topic); + } else { + log.warn("Unsupported type of topic {}", topic.getClass().getName()); } }); - + // end persistent topics section topicStatsStream.endObject(); + + if(!tempNonPersistentTopics.isEmpty()) { + // start non-persistent topic + topicStatsStream.startObject("non-persistent"); + tempNonPersistentTopics.forEach(topic -> { + try { + topic.updateRates(nsStats, currentBundleStats, topicStatsStream, + clusterReplicationMetrics, namespaceName); + } catch (Exception e) { + log.error("Failed to generate topic stats for topic {}: {}", topic.getName(), e.getMessage(), e); + } + }); + // end non-persistent topics section + topicStatsStream.endObject(); + } + + // end namespace-bundle section topicStatsStream.endObject(); }); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services