This is an automated email from the ASF dual-hosted git repository.

wankai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d7c7873f0 OAP self observability: Add watermark circuit break/recover 
metrics. (#12981)
0d7c7873f0 is described below

commit 0d7c7873f062a58c9502f46d55122a0053abb79d
Author: Wan Kai <[email protected]>
AuthorDate: Thu Jan 16 20:32:02 2025 +0800

    OAP self observability: Add watermark circuit break/recover metrics. 
(#12981)
---
 docs/en/changes/changes.md                         |  2 +
 .../oap/server/core/CoreModuleProvider.java        |  3 +-
 .../server/core/watermark/WatermarkListener.java   | 17 +----
 .../server/core/watermark/WatermarkWatcher.java    | 52 ++++++++++++--
 .../src/main/resources/otel-rules/oap.yaml         |  4 ++
 .../so11y_oap/so11y-instance.json                  | 81 ++++++++++++++++++++--
 6 files changed, 134 insertions(+), 25 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 08fca073d7..427459d048 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -63,6 +63,7 @@
 * BanyanDB: Add support for compatibility checks based on the BanyanDB 
server's API version.
 * MQE: Support `&&(and)`, `||(or)` bool operators.
 * OAP self observability: Add JVM heap and direct memory used metrics.
+* OAP self observability: Add watermark circuit break/recover metrics.
 
 #### UI
 
@@ -88,6 +89,7 @@
 * Update browser dashboard for the new metrics.
 * Visualize `Snapshot` on `Alerting` page.
 * OAP self observability dashboard: Add JVM heap and direct memory used 
metrics.
+* OAP self observability dashboard: Add watermark circuit break/recover 
metrics.
 
 #### Documentation
 * Update release document to adopt newly added revision-based process.
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 3b7d32a0f2..7b61f0e901 100755
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -385,7 +385,8 @@ public class CoreModuleProvider extends ModuleProvider {
         loggingConfigWatcher = new LoggingConfigWatcher(this);
 
         WatermarkGRPCInterceptor.create();
-        this.watermarkWatcher = new 
WatermarkWatcher(moduleConfig.getMaxHeapMemoryUsagePercent(),
+        this.watermarkWatcher = new WatermarkWatcher(getManager(),
+                                                     
moduleConfig.getMaxHeapMemoryUsagePercent(),
                                                      
moduleConfig.getMaxDirectMemoryUsage());
     }
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkListener.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkListener.java
index 9262ade5ec..e551ee73f9 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkListener.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkListener.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.oap.server.core.watermark;
 
-import java.util.List;
 import lombok.Getter;
 
 /**
@@ -34,7 +33,6 @@ import lombok.Getter;
 public abstract class WatermarkListener {
     @Getter
     private String name;
-    private List<WatermarkEvent.Type> acceptedTypes;
     private volatile boolean isWatermarkExceeded = false;
 
     /**
@@ -42,21 +40,12 @@ public abstract class WatermarkListener {
      * This should be the default way to create a listener.
      */
     public WatermarkListener(String name) {
-        this(name, WatermarkEvent.Type.values());
-    }
-
-    public WatermarkListener(String name, WatermarkEvent.Type... types) {
         this.name = name;
-        this.acceptedTypes = List.of(types);
     }
 
-    boolean notify(WatermarkEvent.Type event) {
-        if (acceptedTypes.contains(event)) {
-            isWatermarkExceeded = true;
-            beAwareOf(event);
-            return true;
-        }
-        return false;
+    void notify(WatermarkEvent.Type event) {
+        isWatermarkExceeded = true;
+        beAwareOf(event);
     }
 
     public boolean isWatermarkExceeded() {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java
index 878356aa5c..db5c75a718 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/watermark/WatermarkWatcher.java
@@ -19,14 +19,21 @@
 package org.apache.skywalking.oap.server.core.watermark;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.module.TerminalFriendlyTable;
+import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
+import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsCollector;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
+import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 
 /**
  * WatermarkWatcher is a component to watch the key metrics of the system, and 
trigger the watermark event when the
@@ -35,6 +42,7 @@ import 
org.apache.skywalking.oap.server.telemetry.api.MetricsCollector;
 @RequiredArgsConstructor
 @Slf4j
 public class WatermarkWatcher {
+    private final ModuleManager moduleManager;
     private final long maxHeapMemoryUsagePercentThreshold;
     private final long maxDirectHeapMemoryUsageThreshold;
 
@@ -54,12 +62,22 @@ public class WatermarkWatcher {
     private ReentrantLock lock;
     private List<WatermarkListener> listeners;
     private volatile boolean isLimiting = false;
+    private Map<WatermarkEvent.Type, Map<String, CounterMetrics>> 
breakCounters;
+    private Map<String, CounterMetrics> recoverCounters;
+    private MetricsCreator metricsCreator;
 
     public void start(MetricsCollector so11yCollector) {
         this.so11yCollector = so11yCollector;
         lock = new ReentrantLock();
         listeners = new ArrayList<>();
-
+        breakCounters = new HashMap<>();
+        recoverCounters = new HashMap<>();
+        for (WatermarkEvent.Type type : WatermarkEvent.Type.values()) {
+            breakCounters.put(type, new HashMap<>());
+        }
+        metricsCreator = moduleManager.find(TelemetryModule.NAME)
+                                                          .provider()
+                                                          
.getService(MetricsCreator.class);
         this.addListener(WatermarkGRPCInterceptor.INSTANCE);
 
         Executors.newSingleThreadScheduledExecutor()
@@ -100,6 +118,9 @@ public class WatermarkWatcher {
     }
 
     private void notify(WatermarkEvent.Type event) {
+        if (isLimiting) {
+            return;
+        }
         TerminalFriendlyTable table = new TerminalFriendlyTable("Watermark 
Controller Key Metrics");
         table.addRow(new TerminalFriendlyTable.Row("Heap Memory Max", 
String.format("%,d", heapMemoryMax)));
         table.addRow(new TerminalFriendlyTable.Row("Heap Memory Used", 
String.format("%,d", heapMemoryUsed)));
@@ -112,9 +133,9 @@ public class WatermarkWatcher {
         lock.lock();
         try {
             listeners.forEach(listener -> {
-                if (listener.notify(event)) {
-                    table.addRow(new TerminalFriendlyTable.Row("Notified 
Listener", listener.getName()));
-                }
+                listener.notify(event);
+                table.addRow(new TerminalFriendlyTable.Row("Notified 
Listener", listener.getName()));
+                breakCounters.get(event).get(listener.getName()).inc();
             });
         } finally {
             lock.unlock();
@@ -136,6 +157,7 @@ public class WatermarkWatcher {
             listeners.forEach(listener -> {
                 listener.beAwareOfRecovery();
                 table.addRow(new TerminalFriendlyTable.Row("Notified 
Listener", listener.getName()));
+                recoverCounters.get(listener.getName()).inc();
             });
         } finally {
             lock.unlock();
@@ -151,10 +173,32 @@ public class WatermarkWatcher {
         }
     }
 
+    private MetricsCreator getMetricsCreator() {
+        if (metricsCreator == null) {
+            metricsCreator = moduleManager.find(TelemetryModule.NAME)
+                                                          .provider()
+                                                          
.getService(MetricsCreator.class);
+        }
+        return metricsCreator;
+    }
+
     public void addListener(WatermarkListener listener) {
         lock.lock();
         try {
             listeners.add(listener);
+            MetricsCreator metricsCreator = getMetricsCreator();
+            for (WatermarkEvent.Type type : WatermarkEvent.Type.values()) {
+                breakCounters.get(type).put(listener.getName(), 
metricsCreator.createCounter(
+                    "watermark_circuit_breaker_break_count", "The number of 
times the watermark circuit breaker breaks",
+                    new MetricsTag.Keys("listener", "event"),
+                    new MetricsTag.Values(listener.getName(), type.name())
+                ));
+            }
+            recoverCounters.put(listener.getName(), 
metricsCreator.createCounter(
+                "watermark_circuit_breaker_recover_count", "The number of 
times the watermark circuit breaker recovers",
+                new MetricsTag.Keys("listener"),
+                new MetricsTag.Values(listener.getName())
+            ));
         } finally {
             lock.unlock();
         }
diff --git a/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml 
b/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml
index e238575627..9ee370541e 100644
--- a/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml
+++ b/oap-server/server-starter/src/main/resources/otel-rules/oap.yaml
@@ -126,3 +126,7 @@ metricsRules:
     exp: graphql_query_latency_count.sum(['service', 
'host_name']).increase('PT1M')
   - name: instance_graphql_query_error_count
     exp: graphql_query_error_count.sum(['service', 
'host_name']).increase('PT1M')
+  - name: instance_watermark_circuit_breaker_break_count
+    exp: watermark_circuit_breaker_break_count.sum(['service', 'host_name', 
'listener', 'event'])
+  - name: instance_watermark_circuit_breaker_recover_count
+    exp: watermark_circuit_breaker_recover_count.sum(['service', 'host_name', 
'listener'])
diff --git 
a/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
 
b/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
index edb00a54d8..60f8752ab1 100644
--- 
a/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
+++ 
b/oap-server/server-starter/src/main/resources/ui-initialized-templates/so11y_oap/so11y-instance.json
@@ -193,8 +193,8 @@
                   ]
                 },
                 {
-                  "x": 0,
-                  "y": 39,
+                  "x": 6,
+                  "y": 26,
                   "w": 6,
                   "h": 13,
                   "i": "15",
@@ -215,7 +215,7 @@
                   ]
                 },
                 {
-                  "x": 6,
+                  "x": 12,
                   "y": 26,
                   "w": 6,
                   "h": 13,
@@ -250,7 +250,7 @@
                   ]
                 },
                 {
-                  "x": 12,
+                  "x": 18,
                   "y": 26,
                   "w": 6,
                   "h": 13,
@@ -285,8 +285,8 @@
                   ]
                 },
                 {
-                  "x": 18,
-                  "y": 26,
+                  "x": 0,
+                  "y": 39,
                   "w": 6,
                   "h": 13,
                   "i": "5",
@@ -392,6 +392,75 @@
                   "widget": {
                     "title": "Buffer Pool Used(MB)"
                   }
+                },
+                {
+                  "x": 0,
+                  "y": 52,
+                  "w": 6,
+                  "h": 13,
+                  "i": "19",
+                  "type": "Widget",
+                  "expressions": [
+                    "meter_oap_instance_watermark_circuit_breaker_break_count"
+                  ],
+                  "graph": {
+                    "type": "Line",
+                    "step": false,
+                    "smooth": false,
+                    "showSymbol": true,
+                    "showXAxis": true,
+                    "showYAxis": true
+                  },
+                  "widget": {
+                    "tips": "The number of times the watermark circuit breaker 
breaks",
+                    "title": "Watermark Circuit Breaker Break Count"
+                  }
+                },
+                {
+                  "x": 6,
+                  "y": 52,
+                  "w": 6,
+                  "h": 13,
+                  "i": "20",
+                  "type": "Widget",
+                  "expressions": [
+                    
"meter_oap_instance_watermark_circuit_breaker_recover_count"
+                  ],
+                  "graph": {
+                    "type": "Line",
+                    "step": false,
+                    "smooth": false,
+                    "showSymbol": true,
+                    "showXAxis": true,
+                    "showYAxis": true
+                  },
+                  "widget": {
+                    "title": "Watermark Circuit Breaker Recover Count",
+                    "tips": "The number of times the watermark circuit breaker 
recovers"
+                  }
+                },
+                {
+                  "x": 18,
+                  "y": 39,
+                  "w": 6,
+                  "h": 13,
+                  "i": "21",
+                  "type": "Widget",
+                  "expressions": [
+                    
"aggregate_labels(meter_oap_instance_watermark_circuit_breaker_break_count,sum(listener))-meter_oap_instance_watermark_circuit_breaker_recover_count"
+                  ],
+                  "graph": {
+                    "type": "Line",
+                    "step": false,
+                    "smooth": false,
+                    "showSymbol": true,
+                    "showXAxis": true,
+                    "showYAxis": true
+                  },
+                  "widget": {
+                    "title": "Watermark Circuit Breaker Status",
+                    "tips": "The status of circuit breaker listeners, 0 means 
all recovered from the breaks"
+                  }
                 }
               ]
             },

Reply via email to