This is an automated email from the ASF dual-hosted git repository. wu-sheng pushed a commit to branch fix/runtime-rule-schema-cache-self-heal in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 4598124f9902b081a005f72f9c332c5bdf231c52 Author: Wu Sheng <[email protected]> AuthorDate: Sun Jun 14 17:19:55 2026 +0800 Fix BanyanDB runtime-rule self-heal + v2 MAL CounterWindow collision & Elvis falsy semantics * BanyanDB schema-cache self-heal: persist DAOs re-derive a missing local schema (RPC-free) once before failing; the no-init defer loop retries a transient backend probe error (isRetryableNoInitProbeFailure, default false / BanyanDB opt-in) instead of crash-looping the pod. * v2 MAL CounterWindow key collision: rate()/increase()/irate() keyed each counter's sliding window on the rule's output metric name (shared by every input metric of a rule) instead of the counter's own name, so counters that reduce to the same labels after .sum() shared one window slot and rated against each other's values -- fabricating non-zero rates from frozen counters (BanyanDB liaison gRPC error rate). Now keyed by the counter's own metric name. * v2 MAL Elvis ?: honored only null (Optional.ofNullable().orElse()); now Groovy-falsy via MalRuntimeHelper.elvis/isTruthy, single-evaluated -- fixes BanyanDB liaison node_type="" stored instead of "n/a". * banyandb otel-rules: PT15S -> PT1M rate window. * Tests: BanyanDBErrorRateReproTest, MALElvisFalsyTest, MetadataRegistryTest, ModelInstallerNoInitTest. --- docs/en/changes/changes.md | 3 + .../analyzer/v2/compiler/MALClosureCodegen.java | 8 +- .../analyzer/v2/compiler/rt/MalRuntimeHelper.java | 28 ++++- .../oap/meter/analyzer/v2/dsl/SampleFamily.java | 6 +- .../v2/dsl/BanyanDBErrorRateReproTest.java | 125 +++++++++++++++++++++ .../meter/analyzer/v2/dsl/MALElvisFalsyTest.java | 98 ++++++++++++++++ .../server/core/storage/model/ModelInstaller.java | 43 +++++-- .../storage/model/ModelInstallerNoInitTest.java | 83 +++++++++++++- .../otel-rules/banyandb/banyandb-instance.yaml | 42 +++---- .../otel-rules/banyandb/banyandb-service.yaml | 6 +- .../plugin/banyandb/BanyanDBIndexInstaller.java | 48 +++++++- .../plugin/banyandb/BanyanDBNoneStreamDAO.java | 6 + .../storage/plugin/banyandb/MetadataRegistry.java | 47 +++++++- .../banyandb/measure/BanyanDBMetricsDAO.java | 33 ++++-- .../plugin/banyandb/stream/BanyanDBRecordDAO.java | 9 ++ .../plugin/banyandb/MetadataRegistryTest.java | 81 +++++++++++++ 16 files changed, 611 insertions(+), 55 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index 51aa0dfa79..59aa378a6b 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -242,6 +242,9 @@ admin-host only" entry above for the public REST retirement. #### OAP Server +* Fix BanyanDB peer nodes permanently flooding `<metric> is not registered` when a node holds a live persist worker but its local `MetadataRegistry` schema cache was never populated for that model — e.g. a `withoutSchemaChange` peer apply or a runtime-rule bundled fall-over rebuilt the dispatch worker but skipped the populate, and nothing (the registry never evicts, the 30s reconcile only covers runtime-rule rows) ever re-derived it. The persist DAOs now self-heal a missing entry once wi [...] +* Fix a v2 MAL `CounterWindow` key collision: `rate()` / `increase()` / `irate()` keyed each counter's sliding window on the rule's output metric name (the same for every input metric of a rule) instead of the counter's own name, so two or more counters that reduce to the same label set after `.sum(...)` shared one window and computed rates against each other's values — fabricating non-zero rates from unchanged counters (e.g. the BanyanDB liaison gRPC error rate read a steady non-zero of [...] +* Fix the v2 MAL Elvis operator `?:` to honor Groovy-falsy semantics. It compiled to `Optional.ofNullable(primary).orElse(fallback)`, applying the fallback only when the primary is `null`, so an empty-string primary kept `""` instead — e.g. a BanyanDB liaison `ServiceInstance` stored `node_type=""` rather than `n/a`, because `.sum([...,'node_type'])` fills an absent group-by label with `""`. The fallback now applies for falsy primaries such as null, false, numeric zero, and empty strings [...] * SWIP-15: rebuild BanyanDB self-observability around the cluster / container / group model (requires BanyanDB 0.11+). A BanyanDB cluster is modeled as one `Service`, each container as a `ServiceInstance` (role/tier as attributes), and each storage group as an `Endpoint`. The `otel-rules/banyandb/` rules are category-separated by role (`node_*` / `liaison_*` / `data_*` / `lifecycle_*`) and by data type (`measure_*` / `stream_*` / `trace_*` / `property_*`), mirroring the upstream FODC-pro [...] * Runtime MAL/LAL hot-update rules can declare `layerDefinitions:` to introduce new layers. Ordinals are operator-pinned in the `100_000+` tier; the layer is diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/MALClosureCodegen.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/MALClosureCodegen.java index 8b46f68cf9..bc2806e5e3 100644 --- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/MALClosureCodegen.java +++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/MALClosureCodegen.java @@ -300,9 +300,13 @@ final class MALClosureCodegen { } else if (expr instanceof MALExpressionModel.ClosureElvisExpr) { final MALExpressionModel.ClosureElvisExpr elvis = (MALExpressionModel.ClosureElvisExpr) expr; - sb.append("java.util.Optional.ofNullable("); + // Groovy `?:` applies the fallback when the primary is falsy (null, + // empty string/container, numeric zero, false), not only when null. + // Keep the primary single-evaluated so expressions such as tags.remove(...) + // do not observe different values between the truth check and result. + sb.append(MALCodegenHelper.RUNTIME_HELPER_FQCN).append(".elvis("); generateClosureExpr(sb, elvis.getPrimary(), paramName, beanMode); - sb.append(").orElse("); + sb.append(", "); generateClosureExpr(sb, elvis.getFallback(), paramName, beanMode); sb.append(")"); } else if (expr instanceof MALExpressionModel.ClosureRegexMatchExpr) { diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/rt/MalRuntimeHelper.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/rt/MalRuntimeHelper.java index 669be6ef7c..7275e11db4 100644 --- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/rt/MalRuntimeHelper.java +++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/compiler/rt/MalRuntimeHelper.java @@ -17,6 +17,9 @@ package org.apache.skywalking.oap.meter.analyzer.v2.compiler.rt; +import java.lang.reflect.Array; +import java.util.Collection; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.skywalking.oap.meter.analyzer.v2.dsl.Sample; @@ -55,12 +58,9 @@ public final class MalRuntimeHelper { return new String[][] {row}; } - /** - * Reverse division: computes {@code numerator / v} for each sample value {@code v}. - * Used by generated code for {@code Number / SampleFamily} expressions. - */ /** * Groovy truth check: {@code null → false}, empty string → {@code false}, + * numeric zero → {@code false}, empty collection/map/array → {@code false}, * {@code Boolean.FALSE → false}, everything else → {@code true}. * Used by generated filter code for standalone expressions in boolean context * (e.g., {@code tags.ApiId || tags.ApiName}). @@ -75,9 +75,29 @@ public final class MalRuntimeHelper { if (value instanceof CharSequence) { return ((CharSequence) value).length() > 0; } + if (value instanceof Number) { + return ((Number) value).doubleValue() != 0.0D; + } + if (value instanceof Collection) { + return !((Collection<?>) value).isEmpty(); + } + if (value instanceof Map) { + return !((Map<?, ?>) value).isEmpty(); + } + if (value.getClass().isArray()) { + return Array.getLength(value) > 0; + } return true; } + public static <T> T elvis(final T primary, final T fallback) { + return isTruthy(primary) ? primary : fallback; + } + + /** + * Reverse division: computes {@code numerator / v} for each sample value {@code v}. + * Used by generated code for {@code Number / SampleFamily} expressions. + */ public static SampleFamily divReverse(final double numerator, final SampleFamily sf) { if (sf == SampleFamily.EMPTY) { diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/SampleFamily.java index e3d1aec7b3..f07ccc8bf3 100644 --- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/SampleFamily.java +++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/SampleFamily.java @@ -431,7 +431,7 @@ public class SampleFamily { Arrays.stream(samples) .map(sample -> sample.increase( range, - context.metricName, + sample.getName(), (lowerBoundValue, unused) -> sample.value - lowerBoundValue )) .toArray(Sample[]::new) @@ -448,7 +448,7 @@ public class SampleFamily { Arrays.stream(samples) .map(sample -> sample.increase( range, - context.metricName, + sample.getName(), (lowerBoundValue, lowerBoundTime) -> { final long timeDiff = (sample.timestamp - lowerBoundTime) / 1000; return timeDiff < 1L ? 0.0 : (sample.value - lowerBoundValue) / timeDiff; @@ -466,7 +466,7 @@ public class SampleFamily { this.context, Arrays.stream(samples) .map(sample -> sample.increase( - context.metricName, + sample.getName(), (lowerBoundValue, lowerBoundTime) -> { final long timeDiff = (sample.timestamp - lowerBoundTime) / 1000; return timeDiff < 1L ? 0.0 : (sample.value - lowerBoundValue) / timeDiff; diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/BanyanDBErrorRateReproTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/BanyanDBErrorRateReproTest.java new file mode 100644 index 0000000000..361a35b9ff --- /dev/null +++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/BanyanDBErrorRateReproTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.meter.analyzer.v2.dsl; + +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.skywalking.oap.meter.analyzer.v2.dsl.counter.CounterWindow; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Reproduces the BanyanDB liaison_grpc_error_rate fabrication using the EXACT rule expression and the + * real (frozen) counter values scraped from the live demo FODC proxy. Counters never change across the + * simulated scrapes, so every rate term — and the summed result — MUST be 0. Any non-zero output proves + * the CounterWindow key collision: the three distinct error counters reduce to identical labels after + * .sum([...]) and, because the rate keys on the (shared, rule-level) context.metricName instead of each + * counter's own name, they share one CounterWindow slot and rate against each other's values. + */ +public class BanyanDBErrorRateReproTest { + + private static final String GROUP_BY = "['cluster','pod_name','container_name','node_role','node_type']"; + + // Verbatim from otel-rules/banyandb/banyandb-instance.yaml : liaison_grpc_error_rate (value part). + private static final String EXPR = + "(banyandb_liaison_grpc_total_err.sum(" + GROUP_BY + ").rate('PT1M')" + + " + banyandb_liaison_grpc_total_registry_err.sum(" + GROUP_BY + ").rate('PT1M')" + + " + banyandb_liaison_grpc_total_stream_msg_received_err.sum(" + GROUP_BY + ").rate('PT1M')) * 60"; + + @BeforeEach + void resetWindow() { + CounterWindow.INSTANCE.reset(); + } + + private static Sample s(final String name, final double value, final long ts, final String... kv) { + final ImmutableMap.Builder<String, String> b = ImmutableMap.builder(); + for (int i = 0; i < kv.length; i += 2) { + b.put(kv[i], kv[i + 1]); + } + return Sample.builder().name(name).labels(b.build()).value(value).timestamp(ts).build(); + } + + // The three liaison-1 families, with the real frozen values (total_err=5, registry_err=166, stream=5). + // node_type is intentionally ABSENT on liaison samples, exactly as the FODC proxy exposes them. + private Map<String, SampleFamily> scrape(final long ts) { + final String[] common = { + "cluster", "showcase-banyandb", + "pod_name", "demo-banyandb-liaison-1", + "container_name", "liaison", + "node_role", "ROLE_LIAISON", + }; + final List<Sample> totalErr = new ArrayList<>(); + totalErr.add(s("banyandb_liaison_grpc_total_err", 1, ts, with(common, "service", "measure", "method", "query", "group", "sw_metadata"))); + totalErr.add(s("banyandb_liaison_grpc_total_err", 2, ts, with(common, "service", "measure", "method", "query", "group", "sw_metricsMinute"))); + totalErr.add(s("banyandb_liaison_grpc_total_err", 1, ts, with(common, "service", "measure", "method", "query", "group", "sw_metricsHour"))); + totalErr.add(s("banyandb_liaison_grpc_total_err", 1, ts, with(common, "service", "measure", "method", "query", "group", "sw_metricsDay"))); + + final List<Sample> registryErr = new ArrayList<>(); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 47, ts, with(common, "service", "measure", "method", "get", "group", "sw_metricsHour"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 47, ts, with(common, "service", "measure", "method", "get", "group", "sw_metricsMinute"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 47, ts, with(common, "service", "measure", "method", "get", "group", "sw_metricsDay"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 7, ts, with(common, "service", "indexRule", "method", "create", "group", "sw_metricsDay"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 7, ts, with(common, "service", "indexRule", "method", "create", "group", "sw_metricsHour"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 7, ts, with(common, "service", "indexRule", "method", "create", "group", "sw_metricsMinute"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 2, ts, with(common, "service", "trace", "method", "get", "group", "sw_trace"))); + registryErr.add(s("banyandb_liaison_grpc_total_registry_err", 2, ts, with(common, "service", "trace", "method", "get", "group", "sw_zipkinTrace"))); + + final List<Sample> streamErr = new ArrayList<>(); + streamErr.add(s("banyandb_liaison_grpc_total_stream_msg_received_err", 1, ts, with(common, "service", "measure", "method", "write", "group", "sw_metadata"))); + streamErr.add(s("banyandb_liaison_grpc_total_stream_msg_received_err", 2, ts, with(common, "service", "trace", "method", "write", "group", "sw_trace"))); + streamErr.add(s("banyandb_liaison_grpc_total_stream_msg_received_err", 2, ts, with(common, "service", "stream", "method", "write", "group", "sw_recordsLog"))); + + final Map<String, SampleFamily> map = new HashMap<>(); + map.put("banyandb_liaison_grpc_total_err", SampleFamilyBuilder.newBuilder(totalErr.toArray(new Sample[0])).build()); + map.put("banyandb_liaison_grpc_total_registry_err", SampleFamilyBuilder.newBuilder(registryErr.toArray(new Sample[0])).build()); + map.put("banyandb_liaison_grpc_total_stream_msg_received_err", SampleFamilyBuilder.newBuilder(streamErr.toArray(new Sample[0])).build()); + return map; + } + + private static String[] with(final String[] common, final String... extra) { + final String[] out = new String[common.length + extra.length]; + System.arraycopy(common, 0, out, 0, common.length); + System.arraycopy(extra, 0, out, common.length, extra.length); + return out; + } + + @Test + void unchangedCounters_errorRate_mustBeZero() { + final Expression expr = DSL.parse("meter_banyandb_instance_liaison_grpc_error_rate", EXPR); + long ts = 1_700_000_000_000L; + final long step = 10_000L; // 10s scrape, matching the showcase collector + for (int scrape = 0; scrape < 6; scrape++, ts += step) { + final Result result = expr.run(scrape(ts)); + double maxAbs = 0.0; + if (result.isSuccess() && result.getData() != SampleFamily.EMPTY) { + for (final Sample out : result.getData().samples) { + maxAbs = Math.max(maxAbs, Math.abs(out.getValue())); + } + } + // Counters never changed -> error rate MUST be 0 on every scrape. + assertEquals(0.0, maxAbs, 1e-9, + "Unchanged counters must yield 0 error rate, but scrape " + scrape + " produced " + maxAbs); + } + } +} diff --git a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/MALElvisFalsyTest.java b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/MALElvisFalsyTest.java new file mode 100644 index 0000000000..08f334551a --- /dev/null +++ b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/v2/dsl/MALElvisFalsyTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.meter.analyzer.v2.dsl; + +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.skywalking.oap.meter.analyzer.v2.compiler.rt.MalRuntimeHelper; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Groovy's Elvis `?:` applies the fallback when the primary is falsy — including the empty string. + * The v2 codegen previously emitted Optional.ofNullable(P).orElse(F), which only fires on null, so an + * empty-string primary (e.g. a label that .sum() filled with "" for an absent key) leaked "" instead + * of the fallback. This is the exact mechanism behind BanyanDB liaison instances storing node_type="" + * instead of "n/a". + */ +public class MALElvisFalsyTest { + + private static String tagAfterElvis(final String nodeTypeValue) { + final ImmutableMap<String, String> labels = nodeTypeValue == null + ? ImmutableMap.of("svc", "s") + : ImmutableMap.of("svc", "s", "node_type", nodeTypeValue); + final SampleFamily sf = SampleFamilyBuilder.newBuilder( + Sample.builder().name("metric").labels(labels).value(1.0).timestamp(1L).build()).build(); + final Expression expr = DSL.parse("test_elvis", + "metric.tag({tags -> tags['nt'] = tags.node_type ?: 'n/a'})"); + final Result r = expr.run(Map.of("metric", sf)); + return r.getData().samples[0].getLabels().get("nt"); + } + + private static String tagAfterSideEffectingElvis(final String nodeTypeValue) { + final SampleFamily sf = SampleFamilyBuilder.newBuilder( + Sample.builder() + .name("metric") + .labels(ImmutableMap.of("svc", "s", "node_type", nodeTypeValue)) + .value(1.0) + .timestamp(1L) + .build()).build(); + final Expression expr = DSL.parse("test_elvis_remove", + "metric.tag({tags -> tags['nt'] = tags.remove('node_type') ?: 'n/a'})"); + final Result r = expr.run(Map.of("metric", sf)); + return r.getData().samples[0].getLabels().get("nt"); + } + + @Test + void emptyStringPrimary_usesFallback() { + assertEquals("n/a", tagAfterElvis(""), "empty-string primary must fall through to 'n/a' (Groovy-falsy)"); + } + + @Test + void absentPrimary_usesFallback() { + assertEquals("n/a", tagAfterElvis(null), "absent (null) primary must fall through to 'n/a'"); + } + + @Test + void nonEmptyPrimary_keptAsIs() { + assertEquals("hot", tagAfterElvis("hot"), "non-empty primary must be kept"); + } + + @Test + void sideEffectingPrimary_evaluatedOnce() { + assertEquals("hot", tagAfterSideEffectingElvis("hot"), + "Elvis must not evaluate the primary twice; tags.remove(...) returns a value only once"); + } + + @Test + void runtimeTruthiness_matchesGroovyFalsyValues() { + assertFalse(MalRuntimeHelper.isTruthy(0)); + assertFalse(MalRuntimeHelper.isTruthy(0.0D)); + assertFalse(MalRuntimeHelper.isTruthy(Collections.emptyList())); + assertFalse(MalRuntimeHelper.isTruthy(Collections.emptyMap())); + assertFalse(MalRuntimeHelper.isTruthy(new String[0])); + assertTrue(MalRuntimeHelper.isTruthy(-1)); + assertTrue(MalRuntimeHelper.isTruthy(List.of("value"))); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java index d65d7fb75b..f73bd82427 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstaller.java @@ -99,20 +99,39 @@ public abstract class ModelInstaller implements ModelRegistry.CreatingListener, // resource that only this very apply would ever create. if (deferDDLToInitNode(opt)) { while (true) { - InstallInfo info = isExists(model, opt); - if (!info.isAllExist()) { - try { + boolean allExist; + try { + InstallInfo info = isExists(model, opt); + allExist = info.isAllExist(); + if (!allExist) { log.info( "install info: {}.table for model: [{}] not all required resources exist. OAP is running in 'no-init' mode, waiting create or update... retry 3s later.", info.buildInstallInfoMsg(), model.getName() ); - Thread.sleep(3000L); - } catch (InterruptedException e) { - log.error(e.getMessage()); } - } else { + } catch (final StorageException e) { + if (!isRetryableNoInitProbeFailure(e)) { + throw e; + } + // A transient backend error during the probe (e.g. a BanyanDB cluster data node + // still Init-ing, "client connection is closing") is NOT a reason to abort boot: + // the init OAP will create the resource and the next probe succeeds. Treat it like + // "not present yet" and retry in-loop, rather than letting it escape and crash-loop + // the pod — which would only re-enter this same loop after a full restart. + allExist = false; + log.warn("install info: existence probe for model: [{}] threw a transient backend " + + "error. OAP is running in 'no-init' mode, retry 3s later.", model.getName(), e); + } + if (allExist) { break; } + try { + Thread.sleep(3000L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new StorageException( + "interrupted while waiting for no-init backend resources for model " + model.getName(), e); + } } return; } @@ -170,6 +189,16 @@ public abstract class ModelInstaller implements ModelRegistry.CreatingListener, return RunningMode.isNoInitMode() && opt.getFlags().isDeferDDLToInitNode(); } + /** + * Whether a {@link StorageException} from the no-init defer-loop existence probe is + * known to be transient and should be retried in-loop. The base implementation is + * conservative so permanent model/config errors do not become an infinite boot wait; + * storage backends opt in only for transport-level probe failures they can classify. + */ + protected boolean isRetryableNoInitProbeFailure(final StorageException e) { + return false; + } + public void start() { } diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstallerNoInitTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstallerNoInitTest.java index d9cda58cd7..5ee2bbcaf3 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstallerNoInitTest.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelInstallerNoInitTest.java @@ -26,6 +26,7 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -102,25 +103,103 @@ class ModelInstallerNoInitTest { "withSchemaChange must not re-create a resource that already exists"); } + @Test + void noInitDeferLoopRetriesTransientProbeErrorInsteadOfCrashing() { + RunningMode.setMode("no-init"); + // The first existence probe throws a transient StorageException (mimicking a BanyanDB + // cluster data node still Init-ing); the next probe reports the resource present. + final RecordingInstaller installer = new RecordingInstaller(true /* present after transient */, + 1 /* one transient probe failure */, true /* retryable probe failure */); + final Model model = mock(Model.class); + when(model.getName()).thenReturn("static_metric_transient"); + + // Must NOT propagate the transient (which would escape whenCreating and crash-loop the pod); + // must retry in-loop, then return on the defer path without creating. 10s covers the 3s sleep. + assertTimeoutPreemptively(Duration.ofSeconds(10), () -> + installer.whenCreating(model, StorageManipulationOpt.schemaCreateIfAbsent())); + assertEquals(0, installer.createTableCalls, + "a transient probe error must be retried, then defer to the init node without creating"); + assertTrue(installer.probeCalls >= 2, + "the loop must probe again after the transient instead of escaping on the first throw"); + } + + @Test + void noInitDeferLoopPropagatesNonRetryableProbeError() { + RunningMode.setMode("no-init"); + final RecordingInstaller installer = new RecordingInstaller(true /* unused */, + 1 /* one probe failure */, false /* permanent/non-retryable */); + final Model model = mock(Model.class); + when(model.getName()).thenReturn("static_metric_bad_model"); + + assertThrows(StorageException.class, + () -> installer.whenCreating(model, StorageManipulationOpt.schemaCreateIfAbsent()), + "permanent model/config probe failures must not be converted into an infinite no-init wait"); + assertEquals(1, installer.probeCalls, + "a non-retryable failure must escape without sleeping and probing again"); + assertEquals(0, installer.createTableCalls); + } + + @Test + void noInitDeferLoopPropagatesInterruptedSleep() { + RunningMode.setMode("no-init"); + final RecordingInstaller installer = new RecordingInstaller(false /* resource absent */); + final Model model = mock(Model.class); + when(model.getName()).thenReturn("static_metric_wait_interrupted"); + + Thread.currentThread().interrupt(); + try { + assertThrows(StorageException.class, + () -> installer.whenCreating(model, StorageManipulationOpt.schemaCreateIfAbsent()), + "an interrupted no-init wait must fail fast so shutdown can proceed"); + assertTrue(Thread.currentThread().isInterrupted(), + "the interrupt flag must be restored for upstream shutdown handling"); + } finally { + Thread.interrupted(); + } + } + /** Minimal concrete {@link ModelInstaller} that records createTable calls and reports a * fixed existence result, so the base whenCreating branching can be exercised without a - * real storage backend. */ + * real storage backend. Optionally throws a transient {@link StorageException} on the first + * {@code transientProbeFailures} existence probes to exercise the no-init defer-loop retry. */ private static final class RecordingInstaller extends ModelInstaller { private final boolean resourcePresent; + private final int transientProbeFailures; + private final boolean retryableProbeFailure; + private int probeCalls; private int createTableCalls; private RecordingInstaller(final boolean resourcePresent) { + this(resourcePresent, 0, false); + } + + private RecordingInstaller(final boolean resourcePresent, final int transientProbeFailures) { + this(resourcePresent, transientProbeFailures, true); + } + + private RecordingInstaller(final boolean resourcePresent, final int transientProbeFailures, + final boolean retryableProbeFailure) { super(null, null); this.resourcePresent = resourcePresent; + this.transientProbeFailures = transientProbeFailures; + this.retryableProbeFailure = retryableProbeFailure; } @Override - public InstallInfo isExists(final Model model, final StorageManipulationOpt opt) { + public InstallInfo isExists(final Model model, final StorageManipulationOpt opt) throws StorageException { + if (probeCalls++ < transientProbeFailures) { + throw new StorageException("transient backend error"); + } final TestInstallInfo info = new TestInstallInfo(model); info.setAllExist(resourcePresent); return info; } + @Override + protected boolean isRetryableNoInitProbeFailure(final StorageException e) { + return retryableProbeFailure; + } + @Override public void createTable(final Model model) { createTableCalls++; diff --git a/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-instance.yaml b/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-instance.yaml index def21fc308..add9cdc455 100644 --- a/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-instance.yaml +++ b/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-instance.yaml @@ -52,7 +52,7 @@ metricsRules: exp: banyandb_system_up_time # CPU usage (cores). process_* rides on every container including lifecycle. - name: cpu_usage - exp: process_cpu_seconds_total.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: process_cpu_seconds_total.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') # resident memory (bytes). Raw gauge, present on all containers. - name: rss_memory exp: process_resident_memory_bytes @@ -75,45 +75,45 @@ metricsRules: exp: banyandb_system_disk.tagEqual('kind','used_percent').avg(['cluster','pod_name','container_name','node_role','node_type','path']) * 100 # network throughput (bytes/s) by interface name. - name: network_recv - exp: banyandb_system_net_state.tagEqual('kind','bytes_recv').sum(['cluster','pod_name','container_name','node_role','node_type','name']).rate('PT15S') + exp: banyandb_system_net_state.tagEqual('kind','bytes_recv').sum(['cluster','pod_name','container_name','node_role','node_type','name']).rate('PT1M') - name: network_sent - exp: banyandb_system_net_state.tagEqual('kind','bytes_sent').sum(['cluster','pod_name','container_name','node_role','node_type','name']).rate('PT15S') + exp: banyandb_system_net_state.tagEqual('kind','bytes_sent').sum(['cluster','pod_name','container_name','node_role','node_type','name']).rate('PT1M') # Go runtime. - name: goroutines exp: go_goroutines # average GC pause (s) = rate(Σpause) / rate(Σcount). go_gc_duration_seconds is a summary (no buckets), # so this ratio of _sum/_count is the only valid average — do not apply histogram_percentile to it. - name: gc_pause_avg - exp: go_gc_duration_seconds_sum.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(go_gc_duration_seconds_count.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')) + exp: go_gc_duration_seconds_sum.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(go_gc_duration_seconds_count.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')) - name: heap_inuse exp: go_memstats_heap_inuse_bytes - name: heap_next_gc exp: go_memstats_next_gc_bytes - name: alloc_rate - exp: go_memstats_alloc_bytes_total.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: go_memstats_alloc_bytes_total.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') # ---- Liaison only (front door; the UI gates these on container_name == liaison) ---- # query rate (req/s) by data-model service (measure/stream/trace/property). method literal is "query". - name: liaison_query_rate - exp: banyandb_liaison_grpc_total_started.tagEqual('method','query').sum(['cluster','pod_name','container_name','node_role','node_type','service']).rate('PT15S') + exp: banyandb_liaison_grpc_total_started.tagEqual('method','query').sum(['cluster','pod_name','container_name','node_role','node_type','service']).rate('PT1M') # gRPC errors/min. Three liaison-side error families (mirrors the Grafana "gRPC Error Rate" panel, # which sums total_err + registry_err + stream_msg_received_err). All lazily registered -> empty on a # healthy cluster; each pre-aggregated to the same label set before '+'. - name: liaison_grpc_error_rate - exp: (banyandb_liaison_grpc_total_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_liaison_grpc_total_registry_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_liaison_grpc_total_stream_msg_received_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')) * 60 + exp: (banyandb_liaison_grpc_total_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_liaison_grpc_total_registry_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_liaison_grpc_total_stream_msg_received_err.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')) * 60 # registry operation rate (req/s): schema registry ops on the liaison front door. total_started is # query-only on the wire, so the former tagNotEqual('method','query') term was empty and is dropped; # registry_started carries the non-query op count. - name: liaison_registry_op_rate - exp: banyandb_liaison_grpc_total_registry_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_liaison_grpc_total_registry_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') # write rate (writes/s) seen at the liaison front door. group label dropped (instance-level total). - name: liaison_write_rate - exp: banyandb_measure_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_stream_tst_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_trace_tst_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_measure_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_stream_tst_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_trace_tst_total_written.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') # tier-2 publish pipeline (liaison -> data): throughput by operation, bytes/s, and p99 send latency. - name: liaison_publish_throughput - exp: banyandb_queue_pub_total_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT15S') + exp: banyandb_queue_pub_total_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT1M') - name: liaison_publish_bytes - exp: banyandb_queue_pub_sent_bytes.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_queue_pub_sent_bytes.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') - name: liaison_publish_latency_p99 exp: banyandb_queue_pub_total_latency.sum(['cluster','pod_name','container_name','node_role','node_type','operation','le']).histogram().histogram_percentile([99]) # tier-2 publish, batch granularity (BanyanDB #1169): batches published/s by operation and the batch @@ -121,7 +121,7 @@ metricsRules: # BUILD-GATED: _batch_finished/_batch_latency are absent on current builds -> emit nothing until the # shipped BanyanDB build registers them. - name: liaison_publish_batch_throughput - exp: banyandb_queue_pub_total_batch_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT15S') + exp: banyandb_queue_pub_total_batch_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT1M') - name: liaison_publish_batch_latency_p99 exp: banyandb_queue_pub_total_batch_latency.sum(['cluster','pod_name','container_name','node_role','node_type','operation','le']).histogram().histogram_percentile([99]) # liaison write-queue (wqueue) depth: pending records buffered at the front door before publish. @@ -144,34 +144,34 @@ metricsRules: exp: banyandb_measure_pending_data_count.sum(['cluster','pod_name','container_name','node_role','node_type']) + banyandb_stream_tst_pending_data_count.sum(['cluster','pod_name','container_name','node_role','node_type']) + banyandb_trace_tst_pending_data_count.sum(['cluster','pod_name','container_name','node_role','node_type']) # merge-loop iterations/s. - name: data_merge_file_rate - exp: banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_stream_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_trace_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_stream_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_trace_tst_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') # avg parts merged per merge loop on the file path (matches Grafana = rate(merged_parts{type=file}) / # rate(merge_loop_started)). type='file' is data-only on the wire (liaison emits only type='mem'). - name: data_merge_file_partitions - exp: banyandb_measure_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')) + banyandb_stream_tst_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_stream_tst_total_merge_loop_started.sum([ [...] + exp: banyandb_measure_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')) + banyandb_stream_tst_total_merged_parts.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_stream_tst_total_merge_loop_started.sum(['cl [...] # avg file-merge latency (ms) per merge loop. - name: data_merge_file_latency - exp: (banyandb_measure_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S')) + banyandb_stream_tst_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S').safeDiv(banyandb_stream_tst_total_merge_loop_started.su [...] + exp: (banyandb_measure_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_measure_total_merge_loop_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M')) + banyandb_stream_tst_total_merge_latency.tagEqual('type','file').sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M').safeDiv(banyandb_stream_tst_total_merge_loop_started.sum([ [...] # inverted-index (series) write rate / term-search rate / total docs. *_inverted_index_total_* are # # TYPE=gauge but cumulative, so rate() yields a per-window delta. Stream's series index is the # storage scope (stream_storage_*); the tst scope is reported separately below. Trace's series index # (trace_storage_*) is included so trace series writes/docs are not silently dropped. - name: data_series_write_rate - exp: banyandb_measure_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_stream_storage_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_trace_storage_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_measure_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_stream_storage_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_trace_storage_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') - name: data_series_term_search_rate - exp: banyandb_measure_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_stream_storage_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + banyandb_trace_storage_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_measure_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_stream_storage_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') + banyandb_trace_storage_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') - name: data_total_series exp: banyandb_measure_inverted_index_total_doc_count.sum(['cluster','pod_name','container_name','node_role','node_type']) + banyandb_stream_storage_inverted_index_total_doc_count.sum(['cluster','pod_name','container_name','node_role','node_type']) + banyandb_trace_storage_inverted_index_total_doc_count.sum(['cluster','pod_name','container_name','node_role','node_type']) # stream time-series-table (tst) index, distinct from the stream series (storage) index above. - name: data_stream_tst_write_rate - exp: banyandb_stream_tst_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_stream_tst_inverted_index_total_updates.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') - name: data_stream_tst_term_search_rate - exp: banyandb_stream_tst_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT15S') + exp: banyandb_stream_tst_inverted_index_total_term_searchers_started.sum(['cluster','pod_name','container_name','node_role','node_type']).rate('PT1M') - name: data_stream_tst_total_docs exp: banyandb_stream_tst_inverted_index_total_doc_count.sum(['cluster','pod_name','container_name','node_role','node_type']) # subscribe-side queue (data receives from liaison): throughput by operation + p99 latency. - name: data_queue_sub_throughput - exp: banyandb_queue_sub_total_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT15S') + exp: banyandb_queue_sub_total_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT1M') - name: data_queue_sub_latency_p99 exp: banyandb_queue_sub_total_latency.sum(['cluster','pod_name','container_name','node_role','node_type','operation','le']).histogram().histogram_percentile([99]) # subscribe-side per-message throughput (BanyanDB #1169). A data node ingests writes via the @@ -181,7 +181,7 @@ metricsRules: # intentionally not modeled here. Batch-level granularity lives on the liaison's publish side # (liaison_publish_batch_throughput / liaison_publish_batch_latency_p99 above). - name: data_queue_sub_message_throughput - exp: banyandb_queue_sub_total_message_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT15S') + exp: banyandb_queue_sub_total_message_finished.sum(['cluster','pod_name','container_name','node_role','node_type','operation']).rate('PT1M') # retention disk-usage % per data-model scope (0-100 gauge). Kept per scope rather than summed (a sum # of three percentages is meaningless). Not in the upstream Grafana boards; a SkyWalking addition. - name: data_retention_measure_disk_usage_percent diff --git a/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-service.yaml b/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-service.yaml index 97c6cac8f6..45d505d8f4 100644 --- a/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-service.yaml +++ b/oap-server/server-starter/src/main/resources/otel-rules/banyandb/banyandb-service.yaml @@ -29,11 +29,11 @@ metricsRules: # cluster writes/s across the three data-model scopes (measure, stream, trace). Each scope's # write counter is collapsed to one per-cluster series before `+`. - name: cluster_write_rate - exp: (banyandb_measure_total_written.sum(['cluster']).rate('PT15S') + banyandb_stream_tst_total_written.sum(['cluster']).rate('PT15S') + banyandb_trace_tst_total_written.sum(['cluster']).rate('PT15S')) + exp: (banyandb_measure_total_written.sum(['cluster']).rate('PT1M') + banyandb_stream_tst_total_written.sum(['cluster']).rate('PT1M') + banyandb_trace_tst_total_written.sum(['cluster']).rate('PT1M')) # cluster queries/s. `service` on this family is BanyanDB's data-model facet # (measure/stream/trace/property), not a SkyWalking service; method literal is "query". - name: cluster_query_rate - exp: banyandb_liaison_grpc_total_started.tagEqual('method','query').sum(['cluster']).rate('PT15S') + exp: banyandb_liaison_grpc_total_started.tagEqual('method','query').sum(['cluster']).rate('PT1M') # cluster errors/min. The seven liaison-side error families mirror the upstream Grafana # "Error Rate" stat (grafana-fodc-workload.json). Each is pre-aggregated to ['cluster'] # BEFORE `+` because their wire label sets differ (stream_msg_received_err carries @@ -42,7 +42,7 @@ metricsRules: # registered and emit no series; MAL treats an empty operand as the additive identity, so the # sum emits from whatever has fired and renders absent-as-0 when nothing has. - name: cluster_error_rate - exp: (banyandb_liaison_grpc_total_err.sum(['cluster']).rate('PT15S') + banyandb_liaison_grpc_total_registry_err.sum(['cluster']).rate('PT15S') + banyandb_liaison_grpc_total_stream_msg_received_err.sum(['cluster']).rate('PT15S') + banyandb_queue_pub_total_err.sum(['cluster']).rate('PT15S') + banyandb_measure_total_sync_loop_err.sum(['cluster']).rate('PT15S') + banyandb_stream_tst_total_sync_loop_err.sum(['cluster']).rate('PT15S') + banyandb_trace_tst_total_sync_loop_err.sum(['cluster' [...] + exp: (banyandb_liaison_grpc_total_err.sum(['cluster']).rate('PT1M') + banyandb_liaison_grpc_total_registry_err.sum(['cluster']).rate('PT1M') + banyandb_liaison_grpc_total_stream_msg_received_err.sum(['cluster']).rate('PT1M') + banyandb_queue_pub_total_err.sum(['cluster']).rate('PT1M') + banyandb_measure_total_sync_loop_err.sum(['cluster']).rate('PT1M') + banyandb_stream_tst_total_sync_loop_err.sum(['cluster']).rate('PT1M') + banyandb_trace_tst_total_sync_loop_err.sum(['cluster']).rat [...] # live container count by role. count(['cluster','container_name','pod_name']) groups by all # three then re-groups excluding the last key (pod_name), yielding one sample per # (cluster, container_name) whose value = distinct pod_name count -> data=N, liaison=M. diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java index cabfb75276..34c00bfabf 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java @@ -19,9 +19,11 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb; import io.grpc.Status; +import java.time.Duration; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -40,7 +42,6 @@ import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRuleBinding; import org.apache.skywalking.banyandb.schema.v1.BanyandbSchema.SchemaKey; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation; -import java.time.Duration; import org.apache.skywalking.library.banyandb.v1.client.BanyanDBClient; import org.apache.skywalking.library.banyandb.v1.client.SchemaWatcher; import org.apache.skywalking.library.banyandb.v1.client.grpc.exception.BanyanDBException; @@ -103,6 +104,51 @@ public class BanyanDBIndexInstaller extends ModelInstaller { public BanyanDBIndexInstaller(Client client, ModuleManager moduleManager, BanyanDBStorageConfig config) { super(client, moduleManager); this.config = config; + // Let read/persist paths self-heal a missing local schema entry (MetadataRegistry.repopulateLocally): + // re-derive the model's Schema locally with zero server RPC via the same primitive the peer + // boot path uses. This closes the "<metric> is not registered" flood that arises when a + // withoutSchemaChange peer apply or a runtime-rule bundled fall-over rebuilds the dispatch + // worker but skips the populate. DownSamplingConfigService is resolved lazily per call — a + // self-heal only fires post-boot, when CoreModule is long up. + MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(model -> { + final DownSamplingConfigService downSamplingConfigService = moduleManager.find(CoreModule.NAME) + .provider() + .getService(DownSamplingConfigService.class); + registerLocallyByKind(model, downSamplingConfigService); + }); + } + + @Override + protected boolean isRetryableNoInitProbeFailure(final StorageException e) { + Throwable cause = e.getCause(); + while (cause != null) { + if (cause instanceof BanyanDBException) { + return isTransientBanyanDBProbeFailure((BanyanDBException) cause); + } + cause = cause.getCause(); + } + return false; + } + + private static boolean isTransientBanyanDBProbeFailure(final BanyanDBException e) { + final Status.Code code = e.getStatus(); + if (Status.Code.UNAVAILABLE.equals(code) + || Status.Code.DEADLINE_EXCEEDED.equals(code) + || Status.Code.CANCELLED.equals(code) + || Status.Code.RESOURCE_EXHAUSTED.equals(code) + || Status.Code.ABORTED.equals(code)) { + return true; + } + if (!Status.Code.UNKNOWN.equals(code)) { + return false; + } + final String message = String.valueOf(e.getMessage()).toLowerCase(Locale.ROOT); + return message.contains("client connection is closing") + || message.contains("connection is closing") + || message.contains("transport is closing") + || message.contains("connection refused") + || message.contains("connection reset") + || message.contains("broken pipe"); } @Override diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java index 09bd256e14..98763aa3e1 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBNoneStreamDAO.java @@ -40,7 +40,13 @@ public class BanyanDBNoneStreamDAO extends AbstractDAO<BanyanDBStorageClient> im @Override public void insert(Model model, NoneStream noneStream) throws IOException { + // Self-heal a missing local schema entry once (RPC-free re-derivation) before failing — + // see MetadataRegistry.repopulateLocally. Re-read via the same lookup; throw only if still absent. MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findRecordMetadata(model.getName()); + if (schema == null) { + MetadataRegistry.INSTANCE.repopulateLocally(model); + schema = MetadataRegistry.INSTANCE.findRecordMetadata(model.getName()); + } if (schema == null) { throw new IOException(model.getName() + " is not registered"); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java index 958b4c6151..c644b74351 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java @@ -82,6 +82,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.stream.Collectors; @@ -94,7 +95,51 @@ import static org.apache.skywalking.oap.server.core.analysis.metrics.Metrics.ID; public enum MetadataRegistry { INSTANCE; - private final Map<String, Schema> registry = new HashMap<>(); + // ConcurrentHashMap (not HashMap): boot populates single-threaded, but the self-heal path + // (repopulateLocally) writes from persistence/query threads concurrently with reads. + private final Map<String, Schema> registry = new ConcurrentHashMap<>(); + + /** + * Re-derive and locally register a model's BanyanDB {@link Schema} with NO server RPC. + * Registered once by the active {@code BanyanDBIndexInstaller} at boot and invoked by + * {@link #repopulateLocally(Model)} when a read path finds the cache empty for a model whose + * dispatch worker is already live — e.g. a {@code withoutSchemaChange} peer apply or a + * runtime-rule bundled fall-over rebuilt the worker but skipped the local populate. The + * {@code Model} is always known locally and its schema is a pure local derivation, so such a + * miss is always re-derivable without touching the backend. + */ + @FunctionalInterface + public interface LocalSchemaPopulator { + void populateLocally(Model model); + } + + private volatile LocalSchemaPopulator localSchemaPopulator; + + /** Register the boot-time, RPC-free local schema populator. Called once by the active installer. */ + public void registerLocalSchemaPopulator(final LocalSchemaPopulator populator) { + this.localSchemaPopulator = populator; + } + + /** + * Best-effort, RPC-free re-derivation of a model's local {@link Schema} so a read/persist path + * can self-heal a missing cache entry instead of throwing {@code "<model> is not registered"} + * forever (the registry never evicts, so an entry that was never populated on this node stays + * absent otherwise). No-op when no populator is registered (e.g. non-BanyanDB unit tests). + * Swallows derivation exceptions so a self-heal attempt is never worse than the pre-existing + * throw — the caller re-reads and surfaces its own not-registered error if still absent. + */ + public void repopulateLocally(final Model model) { + final LocalSchemaPopulator populator = this.localSchemaPopulator; + if (populator == null) { + return; + } + try { + populator.populateLocally(model); + } catch (final Exception e) { + log.debug("local schema self-heal re-derivation failed for model [{}]; " + + "caller will surface the not-registered error", model.getName(), e); + } + } public StreamModel registerStreamModel(Model model, BanyanDBStorageConfig config) { final SchemaMetadata schemaMetadata = parseMetadata(model, config, null); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java index 4f1ff1928e..d52c70ec5f 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java @@ -63,12 +63,29 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD this.storageBuilder = storageBuilder; } - @Override - public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException { + /** + * Resolve the model's BanyanDB schema, self-healing a missing local entry once before failing. + * A null here means this node has a live persist worker for the model but its schema cache was + * never populated (or lost) — typically a {@code withoutSchemaChange} peer apply or a + * runtime-rule bundled fall-over that rebuilt the worker without the populate. Re-derive the + * schema locally with no server RPC and re-read; throw only if the entry is still absent, so + * a genuinely unknown model still fails fast instead of flooding forever. + */ + private MetadataRegistry.Schema resolveSchema(Model model) throws IOException { MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); if (schema == null) { - throw new IOException(model.getName() + " is not registered"); + MetadataRegistry.INSTANCE.repopulateLocally(model); + schema = MetadataRegistry.INSTANCE.findMetadata(model); + if (schema == null) { + throw new IOException(model.getName() + " is not registered"); + } } + return schema; + } + + @Override + public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException { + MetadataRegistry.Schema schema = resolveSchema(model); final Map<String, List<String>> seriesIDColumns = new HashMap<>(); if (model.getBanyanDBModelExtension().isIndexMode()) { seriesIDColumns.put(ID, new ArrayList<>()); @@ -144,10 +161,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD @Override public InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException { - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); - if (schema == null) { - throw new IOException(model.getName() + " is not registered"); - } + MetadataRegistry.Schema schema = resolveSchema(model); MeasureWrite measureWrite = getClient().createMeasureWrite(schema.getMetadata().getGroup(), // group name schema.getMetadata().name(), // measure-name TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp @@ -161,10 +175,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD @Override public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException { - MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); - if (schema == null) { - throw new IOException(model.getName() + " is not registered"); - } + MetadataRegistry.Schema schema = resolveSchema(model); MeasureWrite measureWrite = getClient().createMeasureWrite(schema.getMetadata().getGroup(), // group name schema.getMetadata().name(), // measure-name TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java index 8bb0d28f80..4632982b76 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBRecordDAO.java @@ -50,7 +50,13 @@ public class BanyanDBRecordDAO extends AbstractBanyanDBDAO implements IRecordDAO @Override public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException { + // Self-heal a missing local schema entry once (RPC-free re-derivation) before failing — + // see MetadataRegistry.repopulateLocally. Throw only if the entry is still absent. MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model); + if (schema == null) { + MetadataRegistry.INSTANCE.repopulateLocally(model); + schema = MetadataRegistry.INSTANCE.findMetadata(model); + } if (schema == null) { throw new IOException(model.getName() + " is not registered"); } @@ -60,6 +66,9 @@ public class BanyanDBRecordDAO extends AbstractBanyanDBDAO implements IRecordDAO if (record instanceof BanyanDBTrace.MergeTable) { BanyanDBTrace.MergeTable mergeTable = (BanyanDBTrace.MergeTable) record; MetadataRegistry.Schema mergeTableSchema = MetadataRegistry.INSTANCE.findRecordMetadata(mergeTable.getMergeTableName()); + if (mergeTableSchema == null) { + throw new IOException(mergeTable.getMergeTableName() + " is not registered"); + } traceWrite = getClient().createTraceWrite( schema.getMetadata().getGroup(), diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistryTest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistryTest.java new file mode 100644 index 0000000000..479644ca94 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistryTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit coverage for the local schema-cache self-heal on {@link MetadataRegistry}. A read/persist + * path that finds the cache empty for a model whose dispatch worker is already live (e.g. a + * {@code withoutSchemaChange} peer apply or a runtime-rule bundled fall-over that rebuilt the + * worker but skipped the populate) must be able to re-derive the schema locally with no server + * RPC, instead of throwing {@code "<model> is not registered"} forever. + */ +class MetadataRegistryTest { + + @AfterEach + void clearPopulator() { + // MetadataRegistry is an enum singleton; clear the populator so global state set by a test + // does not leak into others. + MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(null); + } + + @Test + void repopulateLocallyInvokesRegisteredPopulator() { + final Model model = mock(Model.class); + when(model.getName()).thenReturn("meter_test_metric"); + final AtomicInteger calls = new AtomicInteger(); + MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(m -> calls.incrementAndGet()); + + MetadataRegistry.INSTANCE.repopulateLocally(model); + + assertEquals(1, calls.get(), "a registered populator must be invoked on a self-heal attempt"); + } + + @Test + void repopulateLocallyIsNoOpWhenNoPopulatorRegistered() { + MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(null); + final Model model = mock(Model.class); + assertDoesNotThrow(() -> MetadataRegistry.INSTANCE.repopulateLocally(model), + "self-heal with no populator (e.g. a non-BanyanDB context) must be a no-op"); + } + + @Test + void repopulateLocallySwallowsPopulatorError() { + final Model model = mock(Model.class); + when(model.getName()).thenReturn("meter_test_metric"); + MetadataRegistry.INSTANCE.registerLocalSchemaPopulator(m -> { + throw new RuntimeException("derivation boom"); + }); + + // A failed re-derivation must never be worse than the pre-existing throw: the caller + // re-reads and surfaces its own not-registered error, so repopulateLocally itself must + // not propagate. + assertDoesNotThrow(() -> MetadataRegistry.INSTANCE.repopulateLocally(model), + "a throwing populator must be swallowed so self-heal never worsens the failure"); + } +}
