This is an automated email from the ASF dual-hosted git repository.

wu-sheng pushed a commit to branch fix/runtime-rule-schema-cache-self-heal
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 4598124f9902b081a005f72f9c332c5bdf231c52
Author: Wu Sheng <[email protected]>
AuthorDate: Sun Jun 14 17:19:55 2026 +0800

    Fix BanyanDB runtime-rule self-heal + v2 MAL CounterWindow collision & 
Elvis falsy semantics
    
    * BanyanDB schema-cache self-heal: persist DAOs re-derive a missing local 
schema (RPC-free) once before failing; the no-init defer loop retries a 
transient backend probe error (isRetryableNoInitProbeFailure, default false / 
BanyanDB opt-in) instead of crash-looping the pod.
    * v2 MAL CounterWindow key collision: rate()/increase()/irate() keyed each 
counter's sliding window on the rule's output metric name (shared by every 
input metric of a rule) instead of the counter's own name, so counters that 
reduce to the same labels after .sum() shared one window slot and rated against 
each other's values -- fabricating non-zero rates from frozen counters 
(BanyanDB liaison gRPC error rate). Now keyed by the counter's own metric name.
    * v2 MAL Elvis ?: honored only null (Optional.ofNullable().orElse()); now 
Groovy-falsy via MalRuntimeHelper.elvis/isTruthy, single-evaluated -- fixes 
BanyanDB liaison node_type="" stored instead of "n/a".
    * banyandb otel-rules: PT15S -> PT1M rate window.
    * Tests: BanyanDBErrorRateReproTest, MALElvisFalsyTest, 
MetadataRegistryTest, ModelInstallerNoInitTest.
---
 docs/en/changes/changes.md                         |   3 +
 .../analyzer/v2/compiler/MALClosureCodegen.java    |   8 +-
 .../analyzer/v2/compiler/rt/MalRuntimeHelper.java  |  28 ++++-
 .../oap/meter/analyzer/v2/dsl/SampleFamily.java    |   6 +-
 .../v2/dsl/BanyanDBErrorRateReproTest.java         | 125 +++++++++++++++++++++
 .../meter/analyzer/v2/dsl/MALElvisFalsyTest.java   |  98 ++++++++++++++++
 .../server/core/storage/model/ModelInstaller.java  |  43 +++++--
 .../storage/model/ModelInstallerNoInitTest.java    |  83 +++++++++++++-
 .../otel-rules/banyandb/banyandb-instance.yaml     |  42 +++----
 .../otel-rules/banyandb/banyandb-service.yaml      |   6 +-
 .../plugin/banyandb/BanyanDBIndexInstaller.java    |  48 +++++++-
 .../plugin/banyandb/BanyanDBNoneStreamDAO.java     |   6 +
 .../storage/plugin/banyandb/MetadataRegistry.java  |  47 +++++++-
 .../banyandb/measure/BanyanDBMetricsDAO.java       |  33 ++++--
 .../plugin/banyandb/stream/BanyanDBRecordDAO.java  |   9 ++
 .../plugin/banyandb/MetadataRegistryTest.java      |  81 +++++++++++++
 16 files changed, 611 insertions(+), 55 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 51aa0dfa79..59aa378a6b 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -242,6 +242,9 @@
   admin-host only" entry above for the public REST retirement.
 
 #### OAP Server
+* Fix BanyanDB peer nodes permanently flooding `<metric> is not registered` 
when a node holds a live persist worker but its local `MetadataRegistry` schema 
cache was never populated for that model — e.g. a `withoutSchemaChange` peer 
apply or a runtime-rule bundled fall-over rebuilt the dispatch worker but 
skipped the populate, and nothing (the registry never evicts, the 30s reconcile 
only covers runtime-rule rows) ever re-derived it. The persist DAOs now 
self-heal a missing entry once wi [...]
+* Fix a v2 MAL `CounterWindow` key collision: `rate()` / `increase()` / 
`irate()` keyed each counter's sliding window on the rule's output metric name 
(the same for every input metric of a rule) instead of the counter's own name, 
so two or more counters that reduce to the same label set after `.sum(...)` 
shared one window and computed rates against each other's values — fabricating 
non-zero rates from unchanged counters (e.g. the BanyanDB liaison gRPC error 
rate read a steady non-zero of [...]
+* Fix the v2 MAL Elvis operator `?:` to honor Groovy-falsy semantics. It 
compiled to `Optional.ofNullable(primary).orElse(fallback)`, applying the 
fallback only when the primary is `null`, so an empty-string primary kept `""` 
instead — e.g. a BanyanDB liaison `ServiceInstance` stored `node_type=""` 
rather than `n/a`, because `.sum([...,'node_type'])` fills an absent group-by 
label with `""`. The fallback now applies for falsy primaries such as null, 
false, numeric zero, and empty strings [...]
 * SWIP-15: rebuild BanyanDB self-observability around the cluster / container 
/ group model (requires BanyanDB 0.11+). A BanyanDB cluster is modeled as one 
`Service`, each container as a `ServiceInstance` (role/tier as attributes), and 
each storage group as an `Endpoint`. The `otel-rules/banyandb/` rules are 
category-separated by role (`node_*` / `liaison_*` / `data_*` / `lifecycle_*`) 
and by data type (`measure_*` / `stream_*` / `trace_*` / `property_*`), 
mirroring the upstream FODC-pro [...]
 * Runtime MAL/LAL hot-update rules can declare `layerDefinitions:` to 
introduce new
   layers. Ordinals are operator-pinned in the `100_000+` tier; the layer is
diff --git 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/MALClosureCodegen.java
 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/MALClosureCodegen.java
index 8b46f68cf9..bc2806e5e3 100644
--- 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/MALClosureCodegen.java
+++ 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/MALClosureCodegen.java
@@ -300,9 +300,13 @@ final class MALClosureCodegen {
         } else if (expr instanceof MALExpressionModel.ClosureElvisExpr) {
             final MALExpressionModel.ClosureElvisExpr elvis =
                 (MALExpressionModel.ClosureElvisExpr) expr;
-            sb.append("java.util.Optional.ofNullable(");
+            // Groovy `?:` applies the fallback when the primary is falsy 
(null,
+            // empty string/container, numeric zero, false), not only when 
null.
+            // Keep the primary single-evaluated so expressions such as 
tags.remove(...)
+            // do not observe different values between the truth check and 
result.
+            sb.append(MALCodegenHelper.RUNTIME_HELPER_FQCN).append(".elvis(");
             generateClosureExpr(sb, elvis.getPrimary(), paramName, beanMode);
-            sb.append(").orElse(");
+            sb.append(", ");
             generateClosureExpr(sb, elvis.getFallback(), paramName, beanMode);
             sb.append(")");
         } else if (expr instanceof MALExpressionModel.ClosureRegexMatchExpr) {
diff --git 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/rt/MalRuntimeHelper.java
 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/rt/MalRuntimeHelper.java
index 669be6ef7c..7275e11db4 100644
--- 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/rt/MalRuntimeHelper.java
+++ 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/rt/MalRuntimeHelper.java
@@ -17,6 +17,9 @@
 
 package org.apache.skywalking.oap.meter.analyzer.v2.compiler.rt;
 
+import java.lang.reflect.Array;
+import java.util.Collection;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.skywalking.oap.meter.analyzer.v2.dsl.Sample;
@@ -55,12 +58,9 @@ public final class MalRuntimeHelper {
         return new String[][] {row};
     }
 
-    /**
-     * Reverse division: computes {@code numerator / v} for each sample value 
{@code v}.
-     * Used by generated code for {@code Number / SampleFamily} expressions.
-     */
     /**
      * Groovy truth check: {@code null → false}, empty string → {@code false},
+     * numeric zero → {@code false}, empty collection/map/array → {@code 
false},
      * {@code Boolean.FALSE → false}, everything else → {@code true}.
      * Used by generated filter code for standalone expressions in boolean 
context
      * (e.g., {@code tags.ApiId || tags.ApiName}).
@@ -75,9 +75,29 @@ public final class MalRuntimeHelper {
         if (value instanceof CharSequence) {
             return ((CharSequence) value).length() > 0;
         }
+        if (value instanceof Number) {
+            return ((Number) value).doubleValue() != 0.0D;
+        }
+        if (value instanceof Collection) {
+            return !((Collection<?>) value).isEmpty();
+        }
+        if (value instanceof Map) {
+            return !((Map<?, ?>) value).isEmpty();
+        }
+        if (value.getClass().isArray()) {
+            return Array.getLength(value) > 0;
+        }
         return true;
     }
 
+    public static <T> T elvis(final T primary, final T fallback) {
+        return isTruthy(primary) ? primary : fallback;
+    }
+
+    /**
+     * Reverse division: computes {@code numerator / v} for each sample value 
{@code v}.
+     * Used by generated code for {@code Number / SampleFamily} expressions.
+     */
     public static SampleFamily divReverse(final double numerator,
                                           final SampleFamily sf) {
         if (sf == SampleFamily.EMPTY) {
diff --git 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/SampleFamily.java
 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/SampleFamily.java
index e3d1aec7b3..f07ccc8bf3 100644
--- 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/SampleFamily.java
+++ 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/SampleFamily.java
@@ -431,7 +431,7 @@ public class SampleFamily {
             Arrays.stream(samples)
                   .map(sample -> sample.increase(
                       range,
-                      context.metricName,
+                      sample.getName(),
                       (lowerBoundValue, unused) -> sample.value - 
lowerBoundValue
                   ))
                   .toArray(Sample[]::new)
@@ -448,7 +448,7 @@ public class SampleFamily {
             Arrays.stream(samples)
                   .map(sample -> sample.increase(
                       range,
-                      context.metricName,
+                      sample.getName(),
                       (lowerBoundValue, lowerBoundTime) -> {
                           final long timeDiff = (sample.timestamp - 
lowerBoundTime) / 1000;
                           return timeDiff < 1L ? 0.0 : (sample.value - 
lowerBoundValue) / timeDiff;
@@ -466,7 +466,7 @@ public class SampleFamily {
             this.context,
             Arrays.stream(samples)
                   .map(sample -> sample.increase(
-                      context.metricName,
+                      sample.getName(),
                       (lowerBoundValue, lowerBoundTime) -> {
                           final long timeDiff = (sample.timestamp - 
lowerBoundTime) / 1000;
                           return timeDiff < 1L ? 0.0 : (sample.value - 
lowerBoundValue) / timeDiff;
diff --git 
a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/BanyanDBErrorRateReproTest.java
 
b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/BanyanDBErrorRateReproTest.java
new file mode 100644
index 0000000000..361a35b9ff
--- /dev/null
+++ 
b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/BanyanDBErrorRateReproTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.v2.dsl;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.skywalking.oap.meter.analyzer.v2.dsl.counter.CounterWindow;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Reproduces the BanyanDB liaison_grpc_error_rate fabrication using the EXACT 
rule expression and the
+ * real (frozen) counter values scraped from the live demo FODC proxy. 
Counters never change across the
+ * simulated scrapes, so every rate term — and the summed result — MUST be 0. 
Any non-zero output proves
+ * the CounterWindow key collision: the three distinct error counters reduce 
to identical labels after
+ * .sum([...]) and, because the rate keys on the (shared, rule-level) 
context.metricName instead of each
+ * counter's own name, they share one CounterWindow slot and rate against each 
other's values.
+ */
+public class BanyanDBErrorRateReproTest {
+
+    private static final String GROUP_BY = 
"['cluster','pod_name','container_name','node_role','node_type']";
+
+    // Verbatim from otel-rules/banyandb/banyandb-instance.yaml : 
liaison_grpc_error_rate (value part).
+    private static final String EXPR =
+        "(banyandb_liaison_grpc_total_err.sum(" + GROUP_BY + ").rate('PT1M')"
+        + " + banyandb_liaison_grpc_total_registry_err.sum(" + GROUP_BY + 
").rate('PT1M')"
+        + " + banyandb_liaison_grpc_total_stream_msg_received_err.sum(" + 
GROUP_BY + ").rate('PT1M')) * 60";
+
+    @BeforeEach
+    void resetWindow() {
+        CounterWindow.INSTANCE.reset();
+    }
+
+    private static Sample s(final String name, final double value, final long 
ts, final String... kv) {
+        final ImmutableMap.Builder<String, String> b = ImmutableMap.builder();
+        for (int i = 0; i < kv.length; i += 2) {
+            b.put(kv[i], kv[i + 1]);
+        }
+        return 
Sample.builder().name(name).labels(b.build()).value(value).timestamp(ts).build();
+    }
+
+    // The three liaison-1 families, with the real frozen values (total_err=5, 
registry_err=166, stream=5).
+    // node_type is intentionally ABSENT on liaison samples, exactly as the 
FODC proxy exposes them.
+    private Map<String, SampleFamily> scrape(final long ts) {
+        final String[] common = {
+            "cluster", "showcase-banyandb",
+            "pod_name", "demo-banyandb-liaison-1",
+            "container_name", "liaison",
+            "node_role", "ROLE_LIAISON",
+        };
+        final List<Sample> totalErr = new ArrayList<>();
+        totalErr.add(s("banyandb_liaison_grpc_total_err", 1, ts, with(common, 
"service", "measure", "method", "query", "group", "sw_metadata")));
+        totalErr.add(s("banyandb_liaison_grpc_total_err", 2, ts, with(common, 
"service", "measure", "method", "query", "group", "sw_metricsMinute")));
+        totalErr.add(s("banyandb_liaison_grpc_total_err", 1, ts, with(common, 
"service", "measure", "method", "query", "group", "sw_metricsHour")));
+        totalErr.add(s("banyandb_liaison_grpc_total_err", 1, ts, with(common, 
"service", "measure", "method", "query", "group", "sw_metricsDay")));
+
+        final List<Sample> registryErr = new ArrayList<>();
+        registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 47, ts, 
with(common, "service", "measure", "method", "get", "group", 
"sw_metricsHour")));
+        registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 47, ts, 
with(common, "service", "measure", "method", "get", "group", 
"sw_metricsMinute")));
+        registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 47, ts, 
with(common, "service", "measure", "method", "get", "group", "sw_metricsDay")));
+        registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 7, ts, 
with(common, "service", "indexRule", "method", "create", "group", 
"sw_metricsDay")));
+        registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 7, ts, 
with(common, "service", "indexRule", "method", "create", "group", 
"sw_metricsHour")));
+        registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 7, ts, 
with(common, "service", "indexRule", "method", "create", "group", 
"sw_metricsMinute")));
+        registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 2, ts, 
with(common, "service", "trace", "method", "get", "group", "sw_trace")));
+        registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 2, ts, 
with(common, "service", "trace", "method", "get", "group", "sw_zipkinTrace")));
+
+        final List<Sample> streamErr = new ArrayList<>();
+        streamErr.add(s("banyandb_liaison_grpc_total_stream_msg_received_err", 
1, ts, with(common, "service", "measure", "method", "write", "group", 
"sw_metadata")));
+        streamErr.add(s("banyandb_liaison_grpc_total_stream_msg_received_err", 
2, ts, with(common, "service", "trace", "method", "write", "group", 
"sw_trace")));
+        streamErr.add(s("banyandb_liaison_grpc_total_stream_msg_received_err", 
2, ts, with(common, "service", "stream", "method", "write", "group", 
"sw_recordsLog")));
+
+        final Map<String, SampleFamily> map = new HashMap<>();
+        map.put("banyandb_liaison_grpc_total_err", 
SampleFamilyBuilder.newBuilder(totalErr.toArray(new Sample[0])).build());
+        map.put("banyandb_liaison_grpc_total_registry_err", 
SampleFamilyBuilder.newBuilder(registryErr.toArray(new Sample[0])).build());
+        map.put("banyandb_liaison_grpc_total_stream_msg_received_err", 
SampleFamilyBuilder.newBuilder(streamErr.toArray(new Sample[0])).build());
+        return map;
+    }
+
+    private static String[] with(final String[] common, final String... extra) 
{
+        final String[] out = new String[common.length + extra.length];
+        System.arraycopy(common, 0, out, 0, common.length);
+        System.arraycopy(extra, 0, out, common.length, extra.length);
+        return out;
+    }
+
+    @Test
+    void unchangedCounters_errorRate_mustBeZero() {
+        final Expression expr = 
DSL.parse("meter_banyandb_instance_liaison_grpc_error_rate", EXPR);
+        long ts = 1_700_000_000_000L;
+        final long step = 10_000L; // 10s scrape, matching the showcase 
collector
+        for (int scrape = 0; scrape < 6; scrape++, ts += step) {
+            final Result result = expr.run(scrape(ts));
+            double maxAbs = 0.0;
+            if (result.isSuccess() && result.getData() != SampleFamily.EMPTY) {
+                for (final Sample out : result.getData().samples) {
+                    maxAbs = Math.max(maxAbs, Math.abs(out.getValue()));
+                }
+            }
+            // Counters never changed -> error rate MUST be 0 on every scrape.
+            assertEquals(0.0, maxAbs, 1e-9,
+                "Unchanged counters must yield 0 error rate, but scrape " + 
scrape + " produced " + maxAbs);
+        }
+    }
+}
diff --git 
a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/MALElvisFalsyTest.java
 
b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/MALElvisFalsyTest.java
new file mode 100644
index 0000000000..08f334551a
--- /dev/null
+++ 
b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/MALElvisFalsyTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.v2.dsl;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.skywalking.oap.meter.analyzer.v2.compiler.rt.MalRuntimeHelper;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Groovy's Elvis `?:` applies the fallback when the primary is falsy — 
including the empty string.
+ * The v2 codegen previously emitted Optional.ofNullable(P).orElse(F), which 
only fires on null, so an
+ * empty-string primary (e.g. a label that .sum() filled with "" for an absent 
key) leaked "" instead
+ * of the fallback. This is the exact mechanism behind BanyanDB liaison 
instances storing node_type=""
+ * instead of "n/a".
+ */
+public class MALElvisFalsyTest {
+
+    private static String tagAfterElvis(final String nodeTypeValue) {
+        final ImmutableMap<String, String> labels = nodeTypeValue == null
+            ? ImmutableMap.of("svc", "s")
+            : ImmutableMap.of("svc", "s", "node_type", nodeTypeValue);
+        final SampleFamily sf = SampleFamilyBuilder.newBuilder(
+            
Sample.builder().name("metric").labels(labels).value(1.0).timestamp(1L).build()).build();
+        final Expression expr = DSL.parse("test_elvis",
+            "metric.tag({tags -> tags['nt'] = tags.node_type ?: 'n/a'})");
+        final Result r = expr.run(Map.of("metric", sf));
+        return r.getData().samples[0].getLabels().get("nt");
+    }
+
+    private static String tagAfterSideEffectingElvis(final String 
nodeTypeValue) {
+        final SampleFamily sf = SampleFamilyBuilder.newBuilder(
+            Sample.builder()
+                  .name("metric")
+                  .labels(ImmutableMap.of("svc", "s", "node_type", 
nodeTypeValue))
+                  .value(1.0)
+                  .timestamp(1L)
+                  .build()).build();
+        final Expression expr = DSL.parse("test_elvis_remove",
+            "metric.tag({tags -> tags['nt'] = tags.remove('node_type') ?: 
'n/a'})");
+        final Result r = expr.run(Map.of("metric", sf));
+        return r.getData().samples[0].getLabels().get("nt");
+    }
+
+    @Test
+    void emptyStringPrimary_usesFallback() {
+        assertEquals("n/a", tagAfterElvis(""), "empty-string primary must fall 
through to 'n/a' (Groovy-falsy)");
+    }
+
+    @Test
+    void absentPrimary_usesFallback() {
+        assertEquals("n/a", tagAfterElvis(null), "absent (null) primary must 
fall through to 'n/a'");
+    }
+
+    @Test
+    void nonEmptyPrimary_keptAsIs() {
+        assertEquals("hot", tagAfterElvis("hot"), "non-empty primary must be 
kept");
+    }
+
+    @Test
+    void sideEffectingPrimary_evaluatedOnce() {
+        assertEquals("hot", tagAfterSideEffectingElvis("hot"),
+            "Elvis must not evaluate the primary twice; tags.remove(...) 
returns a value only once");
+    }
+
+    @Test
+    void runtimeTruthiness_matchesGroovyFalsyValues() {
+        assertFalse(MalRuntimeHelper.isTruthy(0));
+        assertFalse(MalRuntimeHelper.isTruthy(0.0D));
+        assertFalse(MalRuntimeHelper.isTruthy(Collections.emptyList()));
+        assertFalse(MalRuntimeHelper.isTruthy(Collections.emptyMap()));
+        assertFalse(MalRuntimeHelper.isTruthy(new String[0]));
+        assertTrue(MalRuntimeHelper.isTruthy(-1));
+        assertTrue(MalRuntimeHelper.isTruthy(List.of("value")));
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
index d65d7fb75b..f73bd82427 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java
@@ -99,20 +99,39 @@ public abstract class ModelInstaller implements 
ModelRegistry.CreatingListener,
         // resource that only this very apply would ever create.
         if (deferDDLToInitNode(opt)) {
             while (true) {
-                InstallInfo info = isExists(model, opt);
-                if (!info.isAllExist()) {
-                    try {
+                boolean allExist;
+                try {
+                    InstallInfo info = isExists(model, opt);
+                    allExist = info.isAllExist();
+                    if (!allExist) {
                         log.info(
                             "install info: {}.table for model: [{}] not all 
required resources exist. OAP is running in 'no-init' mode, waiting create or 
update... retry 3s later.",
                             info.buildInstallInfoMsg(), model.getName()
                         );
-                        Thread.sleep(3000L);
-                    } catch (InterruptedException e) {
-                        log.error(e.getMessage());
                     }
-                } else {
+                } catch (final StorageException e) {
+                    if (!isRetryableNoInitProbeFailure(e)) {
+                        throw e;
+                    }
+                    // A transient backend error during the probe (e.g. a 
BanyanDB cluster data node
+                    // still Init-ing, "client connection is closing") is NOT 
a reason to abort boot:
+                    // the init OAP will create the resource and the next 
probe succeeds. Treat it like
+                    // "not present yet" and retry in-loop, rather than 
letting it escape and crash-loop
+                    // the pod — which would only re-enter this same loop 
after a full restart.
+                    allExist = false;
+                    log.warn("install info: existence probe for model: [{}] 
threw a transient backend "
+                        + "error. OAP is running in 'no-init' mode, retry 3s 
later.", model.getName(), e);
+                }
+                if (allExist) {
                     break;
                 }
+                try {
+                    Thread.sleep(3000L);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new StorageException(
+                        "interrupted while waiting for no-init backend 
resources for model " + model.getName(), e);
+                }
             }
             return;
         }
@@ -170,6 +189,16 @@ public abstract class ModelInstaller implements 
ModelRegistry.CreatingListener,
         return RunningMode.isNoInitMode() && 
opt.getFlags().isDeferDDLToInitNode();
     }
 
+    /**
+     * Whether a {@link StorageException} from the no-init defer-loop 
existence probe is
+     * known to be transient and should be retried in-loop. The base 
implementation is
+     * conservative so permanent model/config errors do not become an infinite 
boot wait;
+     * storage backends opt in only for transport-level probe failures they 
can classify.
+     */
+    protected boolean isRetryableNoInitProbeFailure(final StorageException e) {
+        return false;
+    }
+
     public void start() {
     }
 
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstallerNoInitTest.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstallerNoInitTest.java
index d9cda58cd7..5ee2bbcaf3 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstallerNoInitTest.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstallerNoInitTest.java
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -102,25 +103,103 @@ class ModelInstallerNoInitTest {
             "withSchemaChange must not re-create a resource that already 
exists");
     }
 
+    @Test
+    void noInitDeferLoopRetriesTransientProbeErrorInsteadOfCrashing() {
+        RunningMode.setMode("no-init");
+        // The first existence probe throws a transient StorageException 
(mimicking a BanyanDB
+        // cluster data node still Init-ing); the next probe reports the 
resource present.
+        final RecordingInstaller installer = new RecordingInstaller(true /* 
present after transient */,
+            1 /* one transient probe failure */, true /* retryable probe 
failure */);
+        final Model model = mock(Model.class);
+        when(model.getName()).thenReturn("static_metric_transient");
+
+        // Must NOT propagate the transient (which would escape whenCreating 
and crash-loop the pod);
+        // must retry in-loop, then return on the defer path without creating. 
10s covers the 3s sleep.
+        assertTimeoutPreemptively(Duration.ofSeconds(10), () ->
+            installer.whenCreating(model, 
StorageManipulationOpt.schemaCreateIfAbsent()));
+        assertEquals(0, installer.createTableCalls,
+            "a transient probe error must be retried, then defer to the init 
node without creating");
+        assertTrue(installer.probeCalls >= 2,
+            "the loop must probe again after the transient instead of escaping 
on the first throw");
+    }
+
+    @Test
+    void noInitDeferLoopPropagatesNonRetryableProbeError() {
+        RunningMode.setMode("no-init");
+        final RecordingInstaller installer = new RecordingInstaller(true /* 
unused */,
+            1 /* one probe failure */, false /* permanent/non-retryable */);
+        final Model model = mock(Model.class);
+        when(model.getName()).thenReturn("static_metric_bad_model");
+
+        assertThrows(StorageException.class,
+            () -> installer.whenCreating(model, 
StorageManipulationOpt.schemaCreateIfAbsent()),
+            "permanent model/config probe failures must not be converted into 
an infinite no-init wait");
+        assertEquals(1, installer.probeCalls,
+            "a non-retryable failure must escape without sleeping and probing 
again");
+        assertEquals(0, installer.createTableCalls);
+    }
+
+    @Test
+    void noInitDeferLoopPropagatesInterruptedSleep() {
+        RunningMode.setMode("no-init");
+        final RecordingInstaller installer = new RecordingInstaller(false /* 
resource absent */);
+        final Model model = mock(Model.class);
+        when(model.getName()).thenReturn("static_metric_wait_interrupted");
+
+        Thread.currentThread().interrupt();
+        try {
+            assertThrows(StorageException.class,
+                () -> installer.whenCreating(model, 
StorageManipulationOpt.schemaCreateIfAbsent()),
+                "an interrupted no-init wait must fail fast so shutdown can 
proceed");
+            assertTrue(Thread.currentThread().isInterrupted(),
+                "the interrupt flag must be restored for upstream shutdown 
handling");
+        } finally {
+            Thread.interrupted();
+        }
+    }
+
     /** Minimal concrete {@link ModelInstaller} that records createTable calls 
and reports a
      *  fixed existence result, so the base whenCreating branching can be 
exercised without a
-     *  real storage backend. */
+     *  real storage backend. Optionally throws a transient {@link 
StorageException} on the first
+     *  {@code transientProbeFailures} existence probes to exercise the 
no-init defer-loop retry. */
     private static final class RecordingInstaller extends ModelInstaller {
         private final boolean resourcePresent;
+        private final int transientProbeFailures;
+        private final boolean retryableProbeFailure;
+        private int probeCalls;
         private int createTableCalls;
 
         private RecordingInstaller(final boolean resourcePresent) {
+            this(resourcePresent, 0, false);
+        }
+
+        private RecordingInstaller(final boolean resourcePresent, final int 
transientProbeFailures) {
+            this(resourcePresent, transientProbeFailures, true);
+        }
+
+        private RecordingInstaller(final boolean resourcePresent, final int 
transientProbeFailures,
+                                   final boolean retryableProbeFailure) {
             super(null, null);
             this.resourcePresent = resourcePresent;
+            this.transientProbeFailures = transientProbeFailures;
+            this.retryableProbeFailure = retryableProbeFailure;
         }
 
         @Override
-        public InstallInfo isExists(final Model model, final 
StorageManipulationOpt opt) {
+        public InstallInfo isExists(final Model model, final 
StorageManipulationOpt opt) throws StorageException {
+            if (probeCalls++ < transientProbeFailures) {
+                throw new StorageException("transient backend error");
+            }
             final TestInstallInfo info = new TestInstallInfo(model);
             info.setAllExist(resourcePresent);
             return info;
         }
 
+        @Override
+        protected boolean isRetryableNoInitProbeFailure(final StorageException 
e) {
+            return retryableProbeFailure;
+        }
+
         @Override
         public void createTable(final Model model) {
             createTableCalls++;
diff --git 
a/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-instance.yaml
 
b/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-instance.yaml
index def21fc308..add9cdc455 100644
--- 
a/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-instance.yaml
+++ 
b/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-instance.yaml
@@ -52,7 +52,7 @@ metricsRules:
     exp: banyandb_system_up_time
   # CPU usage (cores). process_* rides on every container including lifecycle.
   - name: cpu_usage
-    exp: 
process_cpu_seconds_total.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
+    exp: 
process_cpu_seconds_total.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
   # resident memory (bytes). Raw gauge, present on all containers.
   - name: rss_memory
     exp: process_resident_memory_bytes
@@ -75,45 +75,45 @@ metricsRules:
     exp: 
banyandb_system_disk.tagEqual('kind','used_percent').avg(['cluster','pod_name','container_name','node_role','node_type','path'])
 * 100
   # network throughput (bytes/s) by interface name.
   - name: network_recv
-    exp: 
banyandb_system_net_state.tagEqual('kind','bytes_recv').sum(['cluster','pod_name','container_name','node_role','node_type','name']).rate('PT15S')
+    exp: 
banyandb_system_net_state.tagEqual('kind','bytes_recv').sum(['cluster','pod_name','container_name','node_role','node_type','name']).rate('PT1M')
   - name: network_sent
-    exp: 
banyandb_system_net_state.tagEqual('kind','bytes_sent').sum(['cluster','pod_name','container_name','node_role','node_type','name']).rate('PT15S')
+    exp: 
banyandb_system_net_state.tagEqual('kind','bytes_sent').sum(['cluster','pod_name','container_name','node_role','node_type','name']).rate('PT1M')
   # Go runtime.
   - name: goroutines
     exp: go_goroutines
   # average GC pause (s) = rate(Σpause) / rate(Σcount). go_gc_duration_seconds 
is a summary (no buckets),
   # so this ratio of _sum/_count is the only valid average — do not apply 
histogram_percentile to it.
   - name: gc_pause_avg
-    exp: 
go_gc_duration_seconds_sum.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(go_gc_duration_seconds_count.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S'))
+    exp: 
go_gc_duration_seconds_sum.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(go_gc_duration_seconds_count.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M'))
   - name: heap_inuse
     exp: go_memstats_heap_inuse_bytes
   - name: heap_next_gc
     exp: go_memstats_next_gc_bytes
   - name: alloc_rate
-    exp: 
go_memstats_alloc_bytes_total.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
+    exp: 
go_memstats_alloc_bytes_total.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
 
   # ---- Liaison only (front door; the UI gates these on container_name == 
liaison) ----
   # query rate (req/s) by data-model service (measure/stream/trace/property). 
method literal is "query".
   - name: liaison_query_rate
-    exp: 
banyandb_liaison_grpc_total_started.tagEqual('method','query').sum(['cluster','pod_name','container_name','node_role','node_type','service']).rate('PT15S')
+    exp: 
banyandb_liaison_grpc_total_started.tagEqual('method','query').sum(['cluster','pod_name','container_name','node_role','node_type','service']).rate('PT1M')
   # gRPC errors/min. Three liaison-side error families (mirrors the Grafana 
"gRPC Error Rate" panel,
   # which sums total_err + registry_err + stream_msg_received_err). All lazily 
registered -> empty on a
   # healthy cluster; each pre-aggregated to the same label set before '+'.
   - name: liaison_grpc_error_rate
-    exp: 
(banyandb_liaison_grpc_total_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
 + 
banyandb_liaison_grpc_total_registry_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
 + 
banyandb_liaison_grpc_total_stream_msg_received_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S'))
 * 60
+    exp: 
(banyandb_liaison_grpc_total_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
 + 
banyandb_liaison_grpc_total_registry_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
 + 
banyandb_liaison_grpc_total_stream_msg_received_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M'))
 * 60
   # registry operation rate (req/s): schema registry ops on the liaison front 
door. total_started is
   # query-only on the wire, so the former tagNotEqual('method','query') term 
was empty and is dropped;
   # registry_started carries the non-query op count.
   - name: liaison_registry_op_rate
-    exp: 
banyandb_liaison_grpc_total_registry_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
+    exp: 
banyandb_liaison_grpc_total_registry_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
   # write rate (writes/s) seen at the liaison front door. group label dropped 
(instance-level total).
   - name: liaison_write_rate
-    exp: 
banyandb_measure_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
 + 
banyandb_stream_tst_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
 + 
banyandb_trace_tst_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
+    exp: 
banyandb_measure_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
 + 
banyandb_stream_tst_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
 + 
banyandb_trace_tst_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
   # tier-2 publish pipeline (liaison -> data): throughput by operation, 
bytes/s, and p99 send latency.
   - name: liaison_publish_throughput
-    exp: 
banyandb_queue_pub_total_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT15S')
+    exp: 
banyandb_queue_pub_total_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT1M')
   - name: liaison_publish_bytes
-    exp: 
banyandb_queue_pub_sent_bytes.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
+    exp: 
banyandb_queue_pub_sent_bytes.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
   - name: liaison_publish_latency_p99
     exp: 
banyandb_queue_pub_total_latency.sum(['cluster','pod_name','container_name','node_role','node_type','operation','le']).histogram().histogram_percentile([99])
   # tier-2 publish, batch granularity (BanyanDB #1169): batches published/s by 
operation and the batch
@@ -121,7 +121,7 @@ metricsRules:
   # BUILD-GATED: _batch_finished/_batch_latency are absent on current builds 
-> emit nothing until the
   # shipped BanyanDB build registers them.
   - name: liaison_publish_batch_throughput
-    exp: 
banyandb_queue_pub_total_batch_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT15S')
+    exp: 
banyandb_queue_pub_total_batch_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT1M')
   - name: liaison_publish_batch_latency_p99
     exp: 
banyandb_queue_pub_total_batch_latency.sum(['cluster','pod_name','container_name','node_role','node_type','operation','le']).histogram().histogram_percentile([99])
   # liaison write-queue (wqueue) depth: pending records buffered at the front 
door before publish.
@@ -144,34 +144,34 @@ metricsRules:
     exp: 
banyandb_measure_pending_data_count.sum(['cluster','pod_name','container_name','node_role','node_type'])
 + 
banyandb_stream_tst_pending_data_count.sum(['cluster','pod_name','container_name','node_role','node_type'])
 + 
banyandb_trace_tst_pending_data_count.sum(['cluster','pod_name','container_name','node_role','node_type'])
   # merge-loop iterations/s.
   - name: data_merge_file_rate
-    exp: 
banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
 + 
banyandb_stream_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
 + 
banyandb_trace_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
+    exp: 
banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
 + 
banyandb_stream_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
 + 
banyandb_trace_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
   # avg parts merged per merge loop on the file path (matches Grafana = 
rate(merged_parts{type=file}) /
   # rate(merge_loop_started)). type='file' is data-only on the wire (liaison 
emits only type='mem').
   - name: data_merge_file_partitions
-    exp: 
banyandb_measure_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S'))
 + 
banyandb_stream_tst_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_stream_tst_total_merge_loop_started.sum([
 [...]
+    exp: 
banyandb_measure_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M'))
 + 
banyandb_stream_tst_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_stream_tst_total_merge_loop_started.sum(['cl
 [...]
   # avg file-merge latency (ms) per merge loop.
   - name: data_merge_file_latency
-    exp: 
(banyandb_measure_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S'))
 + 
banyandb_stream_tst_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_stream_tst_total_merge_loop_started.su
 [...]
+    exp: 
(banyandb_measure_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M'))
 + 
banyandb_stream_tst_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_stream_tst_total_merge_loop_started.sum([
 [...]
   # inverted-index (series) write rate / term-search rate / total docs. 
*_inverted_index_total_* are
   # # TYPE=gauge but cumulative, so rate() yields a per-window delta. Stream's 
series index is the
   # storage scope (stream_storage_*); the tst scope is reported separately 
below. Trace's series index
   # (trace_storage_*) is included so trace series writes/docs are not silently 
dropped.
   - name: data_series_write_rate
-    exp: 
banyandb_measure_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
 + 
banyandb_stream_storage_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
 + 
banyandb_trace_storage_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
+    exp: 
banyandb_measure_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
 + 
banyandb_stream_storage_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
 + 
banyandb_trace_storage_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
   - name: data_series_term_search_rate
-    exp: 
banyandb_measure_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
 + 
banyandb_stream_storage_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
 + 
banyandb_trace_storage_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
+    exp: 
banyandb_measure_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
 + 
banyandb_stream_storage_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
 + 
banyandb_trace_storage_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
   - name: data_total_series
     exp: 
banyandb_measure_inverted_index_total_doc_count.sum(['cluster','pod_name','container_name','node_role','node_type'])
 + 
banyandb_stream_storage_inverted_index_total_doc_count.sum(['cluster','pod_name','container_name','node_role','node_type'])
 + 
banyandb_trace_storage_inverted_index_total_doc_count.sum(['cluster','pod_name','container_name','node_role','node_type'])
   # stream time-series-table (tst) index, distinct from the stream series 
(storage) index above.
   - name: data_stream_tst_write_rate
-    exp: 
banyandb_stream_tst_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
+    exp: 
banyandb_stream_tst_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
   - name: data_stream_tst_term_search_rate
-    exp: 
banyandb_stream_tst_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')
+    exp: 
banyandb_stream_tst_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')
   - name: data_stream_tst_total_docs
     exp: 
banyandb_stream_tst_inverted_index_total_doc_count.sum(['cluster','pod_name','container_name','node_role','node_type'])
   # subscribe-side queue (data receives from liaison): throughput by operation 
+ p99 latency.
   - name: data_queue_sub_throughput
-    exp: 
banyandb_queue_sub_total_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT15S')
+    exp: 
banyandb_queue_sub_total_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT1M')
   - name: data_queue_sub_latency_p99
     exp: 
banyandb_queue_sub_total_latency.sum(['cluster','pod_name','container_name','node_role','node_type','operation','le']).histogram().histogram_percentile([99])
   # subscribe-side per-message throughput (BanyanDB #1169). A data node 
ingests writes via the
@@ -181,7 +181,7 @@ metricsRules:
   # intentionally not modeled here. Batch-level granularity lives on the 
liaison's publish side
   # (liaison_publish_batch_throughput / liaison_publish_batch_latency_p99 
above).
   - name: data_queue_sub_message_throughput
-    exp: 
banyandb_queue_sub_total_message_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT15S')
+    exp: 
banyandb_queue_sub_total_message_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT1M')
   # retention disk-usage % per data-model scope (0-100 gauge). Kept per scope 
rather than summed (a sum
   # of three percentages is meaningless). Not in the upstream Grafana boards; 
a SkyWalking addition.
   - name: data_retention_measure_disk_usage_percent
diff --git 
a/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-service.yaml
 
b/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-service.yaml
index 97c6cac8f6..45d505d8f4 100644
--- 
a/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-service.yaml
+++ 
b/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-service.yaml
@@ -29,11 +29,11 @@ metricsRules:
   # cluster writes/s across the three data-model scopes (measure, stream, 
trace). Each scope's
   # write counter is collapsed to one per-cluster series before `+`.
   - name: cluster_write_rate
-    exp: (banyandb_measure_total_written.sum(['cluster']).rate('PT15S') + 
banyandb_stream_tst_total_written.sum(['cluster']).rate('PT15S') + 
banyandb_trace_tst_total_written.sum(['cluster']).rate('PT15S'))
+    exp: (banyandb_measure_total_written.sum(['cluster']).rate('PT1M') + 
banyandb_stream_tst_total_written.sum(['cluster']).rate('PT1M') + 
banyandb_trace_tst_total_written.sum(['cluster']).rate('PT1M'))
   # cluster queries/s. `service` on this family is BanyanDB's data-model facet
   # (measure/stream/trace/property), not a SkyWalking service; method literal 
is "query".
   - name: cluster_query_rate
-    exp: 
banyandb_liaison_grpc_total_started.tagEqual('method','query').sum(['cluster']).rate('PT15S')
+    exp: 
banyandb_liaison_grpc_total_started.tagEqual('method','query').sum(['cluster']).rate('PT1M')
   # cluster errors/min. The seven liaison-side error families mirror the 
upstream Grafana
   # "Error Rate" stat (grafana-fodc-workload.json). Each is pre-aggregated to 
['cluster']
   # BEFORE `+` because their wire label sets differ (stream_msg_received_err 
carries
@@ -42,7 +42,7 @@ metricsRules:
   # registered and emit no series; MAL treats an empty operand as the additive 
identity, so the
   # sum emits from whatever has fired and renders absent-as-0 when nothing has.
   - name: cluster_error_rate
-    exp: (banyandb_liaison_grpc_total_err.sum(['cluster']).rate('PT15S') + 
banyandb_liaison_grpc_total_registry_err.sum(['cluster']).rate('PT15S') + 
banyandb_liaison_grpc_total_stream_msg_received_err.sum(['cluster']).rate('PT15S')
 + banyandb_queue_pub_total_err.sum(['cluster']).rate('PT15S') + 
banyandb_measure_total_sync_loop_err.sum(['cluster']).rate('PT15S') + 
banyandb_stream_tst_total_sync_loop_err.sum(['cluster']).rate('PT15S') + 
banyandb_trace_tst_total_sync_loop_err.sum(['cluster' [...]
+    exp: (banyandb_liaison_grpc_total_err.sum(['cluster']).rate('PT1M') + 
banyandb_liaison_grpc_total_registry_err.sum(['cluster']).rate('PT1M') + 
banyandb_liaison_grpc_total_stream_msg_received_err.sum(['cluster']).rate('PT1M')
 + banyandb_queue_pub_total_err.sum(['cluster']).rate('PT1M') + 
banyandb_measure_total_sync_loop_err.sum(['cluster']).rate('PT1M') + 
banyandb_stream_tst_total_sync_loop_err.sum(['cluster']).rate('PT1M') + 
banyandb_trace_tst_total_sync_loop_err.sum(['cluster']).rat [...]
   # live container count by role. 
count(['cluster','container_name','pod_name']) groups by all
   # three then re-groups excluding the last key (pod_name), yielding one 
sample per
   # (cluster, container_name) whose value = distinct pod_name count -> data=N, 
liaison=M.
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
index cabfb75276..34c00bfabf 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java
@@ -19,9 +19,11 @@
 package org.apache.skywalking.oap.server.storage.plugin.banyandb;
 
 import io.grpc.Status;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -40,7 +42,6 @@ import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule;
 import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRuleBinding;
 import org.apache.skywalking.banyandb.schema.v1.BanyandbSchema.SchemaKey;
 import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation;
-import java.time.Duration;
 import org.apache.skywalking.library.banyandb.v1.client.BanyanDBClient;
 import org.apache.skywalking.library.banyandb.v1.client.SchemaWatcher;
 import 
org.apache.skywalking.library.banyandb.v1.client.grpc.exception.BanyanDBException;
@@ -103,6 +104,51 @@ public class BanyanDBIndexInstaller extends ModelInstaller 
{
     public BanyanDBIndexInstaller(Client client, ModuleManager moduleManager, 
BanyanDBStorageConfig config) {
         super(client, moduleManager);
         this.config = config;
+        // Let read/persist paths self-heal a missing local schema entry 
(MetadataRegistry.repopulateLocally):
+        // re-derive the model's Schema locally with zero server RPC via the 
same primitive the peer
+        // boot path uses. This closes the "<metric> is not registered" flood 
that arises when a
+        // withoutSchemaChange peer apply or a runtime-rule bundled fall-over 
rebuilds the dispatch
+        // worker but skips the populate. DownSamplingConfigService is 
resolved lazily per call — a
+        // self-heal only fires post-boot, when CoreModule is long up.
+        MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(model -> {
+            final DownSamplingConfigService downSamplingConfigService = 
moduleManager.find(CoreModule.NAME)
+                                                                               
      .provider()
+                                                                               
      .getService(DownSamplingConfigService.class);
+            registerLocallyByKind(model, downSamplingConfigService);
+        });
+    }
+
+    @Override
+    protected boolean isRetryableNoInitProbeFailure(final StorageException e) {
+        Throwable cause = e.getCause();
+        while (cause != null) {
+            if (cause instanceof BanyanDBException) {
+                return isTransientBanyanDBProbeFailure((BanyanDBException) 
cause);
+            }
+            cause = cause.getCause();
+        }
+        return false;
+    }
+
+    private static boolean isTransientBanyanDBProbeFailure(final 
BanyanDBException e) {
+        final Status.Code code = e.getStatus();
+        if (Status.Code.UNAVAILABLE.equals(code)
+            || Status.Code.DEADLINE_EXCEEDED.equals(code)
+            || Status.Code.CANCELLED.equals(code)
+            || Status.Code.RESOURCE_EXHAUSTED.equals(code)
+            || Status.Code.ABORTED.equals(code)) {
+            return true;
+        }
+        if (!Status.Code.UNKNOWN.equals(code)) {
+            return false;
+        }
+        final String message = 
String.valueOf(e.getMessage()).toLowerCase(Locale.ROOT);
+        return message.contains("client connection is closing")
+            || message.contains("connection is closing")
+            || message.contains("transport is closing")
+            || message.contains("connection refused")
+            || message.contains("connection reset")
+            || message.contains("broken pipe");
     }
 
     @Override
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
index 09bd256e14..98763aa3e1 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java
@@ -40,7 +40,13 @@ public class BanyanDBNoneStreamDAO extends 
AbstractDAO<BanyanDBStorageClient> im
 
     @Override
     public void insert(Model model, NoneStream noneStream) throws IOException {
+        // Self-heal a missing local schema entry once (RPC-free 
re-derivation) before failing —
+        // see MetadataRegistry.repopulateLocally. Re-read via the same 
lookup; throw only if still absent.
         MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findRecordMetadata(model.getName());
+        if (schema == null) {
+            MetadataRegistry.INSTANCE.repopulateLocally(model);
+            schema = 
MetadataRegistry.INSTANCE.findRecordMetadata(model.getName());
+        }
         if (schema == null) {
             throw new IOException(model.getName() + " is not registered");
         }
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 958b4c6151..c644b74351 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -82,6 +82,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -94,7 +95,51 @@ import static 
org.apache.skywalking.oap.server.core.analysis.metrics.Metrics.ID;
 public enum MetadataRegistry {
     INSTANCE;
 
-    private final Map<String, Schema> registry = new HashMap<>();
+    // ConcurrentHashMap (not HashMap): boot populates single-threaded, but 
the self-heal path
+    // (repopulateLocally) writes from persistence/query threads concurrently 
with reads.
+    private final Map<String, Schema> registry = new ConcurrentHashMap<>();
+
+    /**
+     * Re-derive and locally register a model's BanyanDB {@link Schema} with 
NO server RPC.
+     * Registered once by the active {@code BanyanDBIndexInstaller} at boot 
and invoked by
+     * {@link #repopulateLocally(Model)} when a read path finds the cache 
empty for a model whose
+     * dispatch worker is already live — e.g. a {@code withoutSchemaChange} 
peer apply or a
+     * runtime-rule bundled fall-over rebuilt the worker but skipped the local 
populate. The
+     * {@code Model} is always known locally and its schema is a pure local 
derivation, so such a
+     * miss is always re-derivable without touching the backend.
+     */
+    @FunctionalInterface
+    public interface LocalSchemaPopulator {
+        void populateLocally(Model model);
+    }
+
+    private volatile LocalSchemaPopulator localSchemaPopulator;
+
+    /** Register the boot-time, RPC-free local schema populator. Called once 
by the active installer. */
+    public void registerLocalSchemaPopulator(final LocalSchemaPopulator 
populator) {
+        this.localSchemaPopulator = populator;
+    }
+
+    /**
+     * Best-effort, RPC-free re-derivation of a model's local {@link Schema} 
so a read/persist path
+     * can self-heal a missing cache entry instead of throwing {@code "<model> 
is not registered"}
+     * forever (the registry never evicts, so an entry that was never 
populated on this node stays
+     * absent otherwise). No-op when no populator is registered (e.g. 
non-BanyanDB unit tests).
+     * Swallows derivation exceptions so a self-heal attempt is never worse 
than the pre-existing
+     * throw — the caller re-reads and surfaces its own not-registered error 
if still absent.
+     */
+    public void repopulateLocally(final Model model) {
+        final LocalSchemaPopulator populator = this.localSchemaPopulator;
+        if (populator == null) {
+            return;
+        }
+        try {
+            populator.populateLocally(model);
+        } catch (final Exception e) {
+            log.debug("local schema self-heal re-derivation failed for model 
[{}]; "
+                + "caller will surface the not-registered error", 
model.getName(), e);
+        }
+    }
 
     public StreamModel registerStreamModel(Model model, BanyanDBStorageConfig 
config) {
         final SchemaMetadata schemaMetadata = parseMetadata(model, config, 
null);
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
index 4f1ff1928e..d52c70ec5f 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
@@ -63,12 +63,29 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO 
implements IMetricsD
         this.storageBuilder = storageBuilder;
     }
 
-    @Override
-    public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws 
IOException {
+    /**
+     * Resolve the model's BanyanDB schema, self-healing a missing local entry 
once before failing.
+     * A null here means this node has a live persist worker for the model but 
its schema cache was
+     * never populated (or lost) — typically a {@code withoutSchemaChange} 
peer apply or a
+     * runtime-rule bundled fall-over that rebuilt the worker without the 
populate. Re-derive the
+     * schema locally with no server RPC and re-read; throw only if the entry 
is still absent, so
+     * a genuinely unknown model still fails fast instead of flooding forever.
+     */
+    private MetadataRegistry.Schema resolveSchema(Model model) throws 
IOException {
         MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findMetadata(model);
         if (schema == null) {
-            throw new IOException(model.getName() + " is not registered");
+            MetadataRegistry.INSTANCE.repopulateLocally(model);
+            schema = MetadataRegistry.INSTANCE.findMetadata(model);
+            if (schema == null) {
+                throw new IOException(model.getName() + " is not registered");
+            }
         }
+        return schema;
+    }
+
+    @Override
+    public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws 
IOException {
+        MetadataRegistry.Schema schema = resolveSchema(model);
         final Map<String, List<String>> seriesIDColumns = new HashMap<>();
         if (model.getBanyanDBModelExtension().isIndexMode()) {
             seriesIDColumns.put(ID, new ArrayList<>());
@@ -144,10 +161,7 @@ public class BanyanDBMetricsDAO extends 
AbstractBanyanDBDAO implements IMetricsD
 
     @Override
     public InsertRequest prepareBatchInsert(Model model, Metrics metrics, 
SessionCacheCallback callback) throws IOException {
-        MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findMetadata(model);
-        if (schema == null) {
-            throw new IOException(model.getName() + " is not registered");
-        }
+        MetadataRegistry.Schema schema = resolveSchema(model);
         MeasureWrite measureWrite = 
getClient().createMeasureWrite(schema.getMetadata().getGroup(), // group name
                 schema.getMetadata().name(), // measure-name
                 TimeBucket.getTimestamp(metrics.getTimeBucket(), 
model.getDownsampling())); // timestamp
@@ -161,10 +175,7 @@ public class BanyanDBMetricsDAO extends 
AbstractBanyanDBDAO implements IMetricsD
 
     @Override
     public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, 
SessionCacheCallback callback) throws IOException {
-        MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findMetadata(model);
-        if (schema == null) {
-            throw new IOException(model.getName() + " is not registered");
-        }
+        MetadataRegistry.Schema schema = resolveSchema(model);
         MeasureWrite measureWrite = 
getClient().createMeasureWrite(schema.getMetadata().getGroup(), // group name
                 schema.getMetadata().name(), // measure-name
                 TimeBucket.getTimestamp(metrics.getTimeBucket(), 
model.getDownsampling())); // timestamp
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
index 8bb0d28f80..4632982b76 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java
@@ -50,7 +50,13 @@ public class BanyanDBRecordDAO extends AbstractBanyanDBDAO 
implements IRecordDAO
 
     @Override
     public InsertRequest prepareBatchInsert(Model model, Record record) throws 
IOException {
+        // Self-heal a missing local schema entry once (RPC-free 
re-derivation) before failing —
+        // see MetadataRegistry.repopulateLocally. Throw only if the entry is 
still absent.
         MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findMetadata(model);
+        if (schema == null) {
+            MetadataRegistry.INSTANCE.repopulateLocally(model);
+            schema = MetadataRegistry.INSTANCE.findMetadata(model);
+        }
         if (schema == null) {
             throw new IOException(model.getName() + " is not registered");
         }
@@ -60,6 +66,9 @@ public class BanyanDBRecordDAO extends AbstractBanyanDBDAO 
implements IRecordDAO
                 if (record instanceof BanyanDBTrace.MergeTable) {
                     BanyanDBTrace.MergeTable mergeTable = 
(BanyanDBTrace.MergeTable) record;
                     MetadataRegistry.Schema mergeTableSchema = 
MetadataRegistry.INSTANCE.findRecordMetadata(mergeTable.getMergeTableName());
+                    if (mergeTableSchema == null) {
+                        throw new IOException(mergeTable.getMergeTableName() + 
" is not registered");
+                    }
 
                     traceWrite = getClient().createTraceWrite(
                         schema.getMetadata().getGroup(),
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistryTest.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistryTest.java
new file mode 100644
index 0000000000..479644ca94
--- /dev/null
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistryTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.skywalking.oap.server.core.storage.model.Model;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit coverage for the local schema-cache self-heal on {@link 
MetadataRegistry}. A read/persist
+ * path that finds the cache empty for a model whose dispatch worker is 
already live (e.g. a
+ * {@code withoutSchemaChange} peer apply or a runtime-rule bundled fall-over 
that rebuilt the
+ * worker but skipped the populate) must be able to re-derive the schema 
locally with no server
+ * RPC, instead of throwing {@code "<model> is not registered"} forever.
+ */
+class MetadataRegistryTest {
+
+    @AfterEach
+    void clearPopulator() {
+        // MetadataRegistry is an enum singleton; clear the populator so 
global state set by a test
+        // does not leak into others.
+        MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(null);
+    }
+
+    @Test
+    void repopulateLocallyInvokesRegisteredPopulator() {
+        final Model model = mock(Model.class);
+        when(model.getName()).thenReturn("meter_test_metric");
+        final AtomicInteger calls = new AtomicInteger();
+        MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(m -> 
calls.incrementAndGet());
+
+        MetadataRegistry.INSTANCE.repopulateLocally(model);
+
+        assertEquals(1, calls.get(), "a registered populator must be invoked 
on a self-heal attempt");
+    }
+
+    @Test
+    void repopulateLocallyIsNoOpWhenNoPopulatorRegistered() {
+        MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(null);
+        final Model model = mock(Model.class);
+        assertDoesNotThrow(() -> 
MetadataRegistry.INSTANCE.repopulateLocally(model),
+            "self-heal with no populator (e.g. a non-BanyanDB context) must be 
a no-op");
+    }
+
+    @Test
+    void repopulateLocallySwallowsPopulatorError() {
+        final Model model = mock(Model.class);
+        when(model.getName()).thenReturn("meter_test_metric");
+        MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(m -> {
+            throw new RuntimeException("derivation boom");
+        });
+
+        // A failed re-derivation must never be worse than the pre-existing 
throw: the caller
+        // re-reads and surfaces its own not-registered error, so 
repopulateLocally itself must
+        // not propagate.
+        assertDoesNotThrow(() -> 
MetadataRegistry.INSTANCE.repopulateLocally(model),
+            "a throwing populator must be swallowed so self-heal never worsens 
the failure");
+    }
+}

Reply via email to