This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch v0.10.x
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit a887dfc2680e4b7c118a836eb2b9eb0ab0386ed7
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon May 11 09:22:04 2026 +0800

    fix: fail fast on incompatible storage version (boot + runtime paths) 
(#1124)
---
 CHANGES.md                                         |   1 +
 Makefile                                           |  34 ++-
 banyand/internal/storage/tsdb.go                   |  21 +-
 banyand/internal/storage/tsdb_test.go              | 100 +++++++
 banyand/internal/storage/version.go                |   5 +-
 banyand/internal/storage/version_test.go           |  29 ++
 banyand/metadata/schema/property/client.go         |  34 ++-
 .../metadata/schema/property/init_handler_test.go  | 209 +++++++++++++++
 pkg/initerror/initerror.go                         |  61 +++++
 pkg/initerror/initerror_test.go                    |  71 +++++
 pkg/schema/cache.go                                | 298 ++++++++++++++++-----
 pkg/schema/cache_test.go                           | 297 ++++++++++++++++++++
 pkg/schema/cache_watcher_fatal_test.go             | 256 ++++++++++++++++++
 pkg/schema/init.go                                 |  12 +-
 14 files changed, 1340 insertions(+), 88 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index c893e5740..fe42fa088 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -39,6 +39,7 @@ Release Notes.
 - Add `GroupLifecycleInfo.errors` to surface per-group collection failures 
from FODC `InspectAll` instead of silently dropping the affected node entry.
 - Fix `CollectDataInfo` and `CollectLiaisonInfo` not handling 
`CATALOG_PROPERTY` groups.
 - Fix lifecycle migration where the receiving node could create segments 
shorter than the configured `SegmentInterval`.
+- Fail fast on incompatible storage version at boot. Previously the server 
would start in a degraded `SERVING` state with affected groups un-loaded 
because the property schema-registry retry loop swallowed the 
version-incompatibility panic. Compatible versions are listed in 
`banyand/internal/storage/versions.yml`.
 
 ### Chores
 
diff --git a/Makefile b/Makefile
index 210087df9..950d73f82 100644
--- a/Makefile
+++ b/Makefile
@@ -47,6 +47,9 @@ generate: TARGET=generate
 generate: PROJECTS:=api $(PROJECTS) pkg
 generate: default  ## Generate API codes
 
+generate-test-cases:  ## Regenerate measure query test cases (input/*.ql, 
input/*.yaml)
+       go run ./test/cases/measure/cmd/generate generate 
test/cases/measure/data
+
 build: TARGET=all
 build: default  ## Build all projects
 
@@ -93,12 +96,40 @@ test-docker: ## Run tests in Docker with constrained 
resources (2 CPU cores, 4GB
          -ldflags "-X 
github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X 
github.com/apache/skywalking-banyandb/pkg/test/flags.consistentlyTimeout=10s -X 
github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=error" \
          $(PKG)
 
+load-test-barrier: ## Run the schema-barrier CP-6 SLO load harness (3 data 
nodes + 1 liaison, 100 callers, 5m measurement). Override with 
LOAD_FLAGS="-loadtest.measure=30s ..." for smoke runs.
+       @echo "Running schema-barrier load harness (CP-6 SLO profile)"
+       @echo "Override profile via LOAD_FLAGS, e.g. 
LOAD_FLAGS='-loadtest.measure=30s -loadtest.callers=20'"
+       go test -tags=loadtest -timeout 30m -count=1 -v 
./test/load/schema_barrier/... $(LOAD_FLAGS)
+
 ##@ Code quality targets
 
 lint: TARGET=lint
 lint: PROJECTS:=api $(PROJECTS) pkg scripts/ci/check test
+lint: check-import-boundaries
 lint: default ## Run the linters on all projects
 
+# check-import-boundaries enforces the layering invariants documented in
+# pkg/initerror/initerror.go: the leaf permanent-error contract must not gain
+# project-internal dependencies, and the property schema-registry classifier
+# must not import the storage internals it classifies via the leaf interface.
+.PHONY: check-import-boundaries
+check-import-boundaries: ## Enforce import-boundary invariants for the 
version-incompat fix
+       @bad=0; \
+       if grep -rln 'banyand/internal/storage' 
banyand/metadata/schema/property/ 2>/dev/null; then \
+               echo "FAIL: banyand/metadata/schema/property/ must not import 
banyand/internal/storage"; \
+               bad=1; \
+       fi; \
+       if grep -rln 'banyand/internal/storage' pkg/schema/ 2>/dev/null; then \
+               echo "FAIL: pkg/schema/ must not import 
banyand/internal/storage"; \
+               bad=1; \
+       fi; \
+       if grep -rln 'github.com/apache/skywalking-banyandb/' 
pkg/initerror/*.go 2>/dev/null | grep -v _test.go; then \
+               echo "FAIL: pkg/initerror/ must remain a leaf with zero 
project-internal imports (test files excepted)"; \
+               bad=1; \
+       fi; \
+       if [ $$bad -ne 0 ]; then exit 1; fi; \
+       echo "import boundaries OK"
+
 ##@ Vendor update
 
 vendor-update: TARGET=vendor-update
@@ -141,6 +172,7 @@ check: ## Check that the status is consistent with CI
 pre-push: ## Check source files before pushing to the remote repo
        $(MAKE) check-req
        $(MAKE) generate
+       $(MAKE) generate-test-cases
        $(MAKE) lint
        $(MAKE) check
        $(MAKE) vuln-check
@@ -222,7 +254,7 @@ release-push-candidate: ## Push release candidate
        ${PUSH_RELEASE_SCRIPTS}
        
 .PHONY: all $(PROJECTS) clean build  default nuke
-.PHONY: lint check tidy format pre-push
+.PHONY: lint check tidy format pre-push generate-test-cases 
check-import-boundaries
 .PHONY: test test-race test-coverage test-ci test-docker
 .PHONY: license-check license-fix license-dep
 .PHONY: release release-binary release-source release-sign release-assembly
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index fa20507e4..22f26569e 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -19,6 +19,7 @@ package storage
 
 import (
        "context"
+       "fmt"
        "path/filepath"
        "strings"
        "sync"
@@ -172,7 +173,7 @@ func (d *database[T, O]) Close() error {
        d.segmentController.close()
        d.lock.Close()
        if err := d.lfs.DeleteFile(d.lock.Path()); err != nil {
-               logger.Panicf("cannot delete lock file %s: %s", d.lock.Path(), 
err)
+               panic(fmt.Errorf("cannot delete lock file %s: %w", 
d.lock.Path(), err))
        }
        return nil
 }
@@ -233,14 +234,30 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts 
TSDBOpts[T, O], cache
        lockPath := filepath.Join(opts.Location, lockFilename)
        lock, err := tsdbLfs.CreateLockFile(lockPath, FilePerm)
        if err != nil {
-               logger.Panicf("cannot create lock file %s: %s", lockPath, err)
+               panic(fmt.Errorf("cannot create lock file %s: %w", lockPath, 
err))
        }
        db.lock = lock
+       // Release the lock fd if any subsequent step in OpenTSDB returns an 
error.
+       // Otherwise the file descriptor leaks until process exit and a retry 
on the
+       // same data dir hits "resource temporarily unavailable" from the next
+       // CreateLockFile call. Ownership transfers to (*database).Close() only 
on
+       // success — released = true is set on the success return below.
+       released := false
+       defer func() {
+               if !released {
+                       _ = lock.Close()
+                       _ = tsdbLfs.DeleteFile(lockPath)
+               }
+       }()
        if err := db.segmentController.open(); err != nil {
                return nil, err
        }
        obsservice.MetricsCollector.Register(location, db.collect)
        db.disableRotation = opts.DisableRotation
+       // db (and its lock fd) is now owned by the caller; on rotation-task 
error
+       // the partially-initialized db is still returned so the caller can 
Close
+       // it and unregister the metrics collector.
+       released = true
        return db, db.startRotationTask()
 }
 
diff --git a/banyand/internal/storage/tsdb_test.go 
b/banyand/internal/storage/tsdb_test.go
index 6b786a9c5..4852b9bd6 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -19,6 +19,8 @@ package storage
 
 import (
        "context"
+       "errors"
+       "os"
        "path/filepath"
        "testing"
        "time"
@@ -28,6 +30,7 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/meter"
        "github.com/apache/skywalking-banyandb/pkg/test"
@@ -670,3 +673,100 @@ func TestCollectWithPartialClosedSegments(t *testing.T) {
        // Clean up
        require.NoError(t, tsdb.Close())
 }
+
+// newTestSegmentSkeleton seeds a minimal on-disk segment layout under dir with
+// the supplied storage version stamp. The layout matches readSegmentMeta's
+// "old format" (version-only file body), which is sufficient to exercise
+// checkVersion at TSDB open time.
+func newTestSegmentSkeleton(t *testing.T, dir, version string) {
+       t.Helper()
+       segDir := filepath.Join(dir, "seg-20240501")
+       require.NoError(t, os.MkdirAll(segDir, 0o755))
+       metaPath := filepath.Join(segDir, metadataFilename)
+       require.NoError(t, os.WriteFile(metaPath, []byte(version), 0o600))
+}
+
+// TestTSDBOpen_LockReleasedAfterFailedOpen reproduces the lock-file leak that
+// the F5 v2 lock-fix targets. Prior to the fix, OpenTSDB acquired the lock
+// file but never released it on the segmentController.open() error path, so
+// a second OpenTSDB call against the same directory failed with
+// "resource temporarily unavailable" from CreateLockFile. The fix adds a
+// defer-release with a `released` boolean flipped to true only on the
+// success path. This test seeds an incompatible segment to force the first
+// OpenTSDB to fail, then removes the segment and asserts the second
+// OpenTSDB succeeds — which is only possible if the lock fd was released.
+func TestTSDBOpen_LockReleasedAfterFailedOpen(t *testing.T) {
+       logger.Init(logger.Logging{Env: "dev", Level: flags.LogLevel})
+
+       dir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       newTestSegmentSkeleton(t, dir, "1.3.0")
+
+       opts := TSDBOpts[*MockTSTable, any]{
+               Location:        dir,
+               SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+               TTL:             IntervalRule{Unit: DAY, Num: 3},
+               ShardNum:        1,
+               TSTableCreator:  MockTSTableCreator,
+       }
+
+       ctx := context.Background()
+       mc := timestamp.NewMockClock()
+       ts, parseErr := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 
00:00:00", time.Local)
+       require.NoError(t, parseErr)
+       mc.Set(ts)
+       ctx = timestamp.SetClock(ctx, mc)
+
+       // First open fails on the incompatible segment. With the leak fix the
+       // defer in OpenTSDB releases the lock fd before returning the error.
+       serviceCache := NewServiceCache()
+       _, openErr := OpenTSDB(ctx, opts, serviceCache, group)
+       require.Error(t, openErr, "first OpenTSDB must fail on the incompatible 
segment")
+       require.True(t, initerror.IsPermanent(openErr), "first OpenTSDB error 
must be permanent")
+
+       // Remove the bad segment so the second open's segmentController scan 
succeeds.
+       require.NoError(t, os.RemoveAll(filepath.Join(dir, "seg-20240501")))
+
+       // Second open against the same dir must succeed — only possible if the
+       // lock fd from the first attempt was released. Without the fix this
+       // returns "resource temporarily unavailable" from CreateLockFile.
+       tsdb, reopenErr := OpenTSDB(ctx, opts, serviceCache, group)
+       require.NoError(t, reopenErr, "second OpenTSDB must succeed after the 
failed first attempt")
+       require.NotNil(t, tsdb)
+       require.NoError(t, tsdb.Close())
+}
+
+// TestTSDBOpen_RejectsIncompatibleSegment verifies that opening a TSDB whose
+// on-disk segment is stamped with an incompatible version surfaces a permanent
+// initialization error so the caller fails fast instead of silently dropping
+// the affected group.
+func TestTSDBOpen_RejectsIncompatibleSegment(t *testing.T) {
+       logger.Init(logger.Logging{Env: "dev", Level: flags.LogLevel})
+
+       dir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       newTestSegmentSkeleton(t, dir, "1.3.0")
+
+       opts := TSDBOpts[*MockTSTable, any]{
+               Location:        dir,
+               SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+               TTL:             IntervalRule{Unit: DAY, Num: 3},
+               ShardNum:        1,
+               TSTableCreator:  MockTSTableCreator,
+       }
+
+       ctx := context.Background()
+       mc := timestamp.NewMockClock()
+       ts, parseErr := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 
00:00:00", time.Local)
+       require.NoError(t, parseErr)
+       mc.Set(ts)
+       ctx = timestamp.SetClock(ctx, mc)
+
+       serviceCache := NewServiceCache()
+       _, openErr := OpenTSDB(ctx, opts, serviceCache, group)
+       require.Error(t, openErr, "OpenTSDB must reject an incompatible-version 
segment")
+       require.True(t, initerror.IsPermanent(openErr), "incompatible-version 
error must surface as permanent")
+       require.True(t, errors.Is(openErr, errVersionIncompatible), "error must 
still match the version-incompatible sentinel")
+}
diff --git a/banyand/internal/storage/version.go 
b/banyand/internal/storage/version.go
index ddea79a33..82d1adaaf 100644
--- a/banyand/internal/storage/version.go
+++ b/banyand/internal/storage/version.go
@@ -24,6 +24,8 @@ import (
 
        "github.com/pkg/errors"
        "sigs.k8s.io/yaml"
+
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
 )
 
 const (
@@ -46,7 +48,8 @@ func checkVersion(version string) error {
                        return nil
                }
        }
-       return errors.WithMessagef(errVersionIncompatible, "incompatible 
version %s, supported versions: %s", version, strings.Join(compatibleVersions, 
", "))
+       return initerror.AsPermanent(errors.WithMessagef(errVersionIncompatible,
+               "incompatible version %s, supported versions: %s", version, 
strings.Join(compatibleVersions, ", ")))
 }
 
 type segmentMeta struct {
diff --git a/banyand/internal/storage/version_test.go 
b/banyand/internal/storage/version_test.go
index 726e6d8a2..31350d56d 100644
--- a/banyand/internal/storage/version_test.go
+++ b/banyand/internal/storage/version_test.go
@@ -18,10 +18,14 @@
 package storage
 
 import (
+       "errors"
+       "fmt"
        "testing"
 
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
 )
 
 func TestReadSegmentMeta_NewFormat(t *testing.T) {
@@ -52,6 +56,31 @@ func TestReadSegmentMeta_IncompatibleVersion(t *testing.T) {
        data := 
[]byte(`{"version":"0.1.0","endTime":"2026-04-07T00:00:00+08:00"}`)
        _, err := readSegmentMeta(data)
        assert.Error(t, err)
+       assert.True(t, initerror.IsPermanent(err), "incompatible version must 
surface as permanent")
+       assert.True(t, errors.Is(err, errVersionIncompatible), "incompatible 
version must still match the sentinel")
+}
+
+func TestCheckVersion_WrappedAsPermanent(t *testing.T) {
+       err := checkVersion("1.3.0")
+       require.Error(t, err)
+       assert.True(t, initerror.IsPermanent(err))
+       assert.True(t, errors.Is(err, errVersionIncompatible))
+
+       wrapped := fmt.Errorf("init: %w", err)
+       assert.True(t, initerror.IsPermanent(wrapped), "permanence must survive 
fmt.Errorf wrapping")
+       assert.True(t, errors.Is(wrapped, errVersionIncompatible))
+}
+
+func TestCheckVersion_CompatibleReturnsNil(t *testing.T) {
+       require.NoError(t, checkVersion(currentVersion))
+       for _, v := range compatibleVersions {
+               require.NoError(t, checkVersion(v))
+       }
+}
+
+func TestIsPermanent_UnrelatedErrorIsNotPermanent(t *testing.T) {
+       assert.False(t, initerror.IsPermanent(errors.New("plain")))
+       assert.False(t, initerror.IsPermanent(nil))
 }
 
 func TestReadSegmentMeta_NewFormatNoEndTime(t *testing.T) {
diff --git a/banyand/metadata/schema/property/client.go 
b/banyand/metadata/schema/property/client.go
index 1a52c4130..926a34914 100644
--- a/banyand/metadata/schema/property/client.go
+++ b/banyand/metadata/schema/property/client.go
@@ -41,6 +41,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -115,6 +116,7 @@ type SchemaRegistry struct {
        l                  *logger.Logger
        cache              *schemaCache
        caCertReloader     *pkgtls.Reloader
+       clock              timestamp.Clock
        handlers           map[schema.Kind][]schema.EventHandler
        watchSessions      map[string]*watchSession
        syncInterval       time.Duration
@@ -180,6 +182,7 @@ func NewSchemaRegistryClient(cfg *ClientConfig) 
(*SchemaRegistry, error) {
                l:                  l,
                cache:              newSchemaCache(),
                caCertReloader:     caCertReloader,
+               clock:              timestamp.NewClock(),
                handlers:           make(map[schema.Kind][]schema.EventHandler),
                watchSessions:      make(map[string]*watchSession),
                syncInterval:       syncInterval,
@@ -976,31 +979,48 @@ func (r *SchemaRegistry) RegisterHandler(name string, 
kind schema.Kind, handler
 }
 
 func (r *SchemaRegistry) initHandlerWithRetry(name string, handler 
schema.EventHandler, kinds []schema.Kind) {
-       deadline := time.Now().Add(time.Minute)
+       deadline := r.clock.Now().Add(time.Minute)
        var lastErr interface{}
        for {
-               if tryCallOnInit(handler, kinds, &lastErr) {
+               ok, permanent := tryCallOnInit(handler, kinds, &lastErr)
+               if ok {
                        return
                }
-               if time.Now().After(deadline) {
-                       r.l.Panic().Str("name", name).Interface("error", 
lastErr).
+               if permanent {
+                       r.l.Error().Str("name", name).Str("error", 
fmt.Sprintf("%+v", lastErr)).
+                               Msg("OnInit hit a permanent error, refusing to 
start")
+                       if recErr, isErr := lastErr.(error); isErr {
+                               panic(fmt.Errorf("OnInit %s: %w", name, recErr))
+                       }
+                       panic(fmt.Errorf("OnInit %s: %v", name, lastErr))
+               }
+               if r.clock.Now().After(deadline) {
+                       r.l.Error().Str("name", name).Interface("error", 
lastErr).
                                Msg("handler OnInit failed after 1m, giving up")
+                       panic(fmt.Errorf("OnInit %s exceeded deadline: %v", 
name, lastErr))
                }
                r.l.Warn().Str("name", name).Interface("error", lastErr).
                        Msg("handler OnInit panicked due to transient error, 
retrying in 1s")
-               time.Sleep(time.Second)
+               r.clock.Sleep(time.Second)
        }
 }
 
-func tryCallOnInit(handler schema.EventHandler, kinds []schema.Kind, lastErr 
*interface{}) (ok bool) {
+// tryCallOnInit invokes handler.OnInit and recovers any panic. Do not wrap the
+// recovered value with multierr before classification; multierr.Append's
+// Unwrap() []error is traversed by errors.As, but extra wrapping changes which
+// concrete type is returned first and breaks IsPermanent's interface lookup.
+func tryCallOnInit(handler schema.EventHandler, kinds []schema.Kind, lastErr 
*interface{}) (ok, permanent bool) {
        defer func() {
                if rec := recover(); rec != nil {
                        *lastErr = rec
                        ok = false
+                       if recErr, isErr := rec.(error); isErr {
+                               permanent = initerror.IsPermanent(recErr)
+                       }
                }
        }()
        handler.OnInit(kinds)
-       return true
+       return true, false
 }
 
 // Start starts a single goroutine that periodically syncs schemas.
diff --git a/banyand/metadata/schema/property/init_handler_test.go 
b/banyand/metadata/schema/property/init_handler_test.go
new file mode 100644
index 000000000..4e6625892
--- /dev/null
+++ b/banyand/metadata/schema/property/init_handler_test.go
@@ -0,0 +1,209 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "errors"
+       "fmt"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// retryTestHandler implements schema.EventHandler for retry-classification 
tests.
+// Each OnInit call invokes the configured panicFn (which may panic). When
+// noPanicAfter calls have been made, OnInit returns normally so the retry loop
+// can succeed.
+type retryTestHandler struct {
+       panicFn       func(callIdx int)
+       startedSignal chan struct{}
+       schema.UnimplementedOnInitHandler
+       calls        atomic.Int32
+       noPanicAfter int32
+}
+
+func (h *retryTestHandler) OnAddOrUpdate(_ schema.Metadata) {}
+func (h *retryTestHandler) OnDelete(_ schema.Metadata)      {}
+
+func (h *retryTestHandler) OnInit(_ []schema.Kind) (bool, []int64) {
+       idx := h.calls.Add(1)
+       if h.startedSignal != nil {
+               select {
+               case h.startedSignal <- struct{}{}:
+               default:
+               }
+       }
+       if h.noPanicAfter > 0 && idx > h.noPanicAfter {
+               return true, nil
+       }
+       if h.panicFn != nil {
+               h.panicFn(int(idx))
+       }
+       return true, nil
+}
+
+// newTestRegistryWithClock builds a SchemaRegistry instance that bypasses
+// connection setup so we can drive its retry classifier in-process. It uses a
+// mock clock so the test controls the deadline and Sleep windows.
+func newTestRegistryWithClock(_ *testing.T, mc timestamp.MockClock) 
*SchemaRegistry {
+       l := logger.GetLogger("init-handler-retry-test")
+       return &SchemaRegistry{
+               l:     l,
+               clock: mc,
+       }
+}
+
+// TestInitHandlerWithRetry_PermanentVersionError_FailsFast asserts that a
+// handler panicking with a Permanent-wrapped error causes the retry loop to
+// exit on the first attempt via r.l.Panic, without ever advancing the clock.
+func TestInitHandlerWithRetry_PermanentVersionError_FailsFast(t *testing.T) {
+       require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level: 
"warn"}))
+       mc := timestamp.NewMockClock()
+       mcStart := mc.Now()
+       r := newTestRegistryWithClock(t, mc)
+       permErr := initerror.AsPermanent(errors.New("incompatible version 
1.3.0"))
+       handler := &retryTestHandler{
+               panicFn: func(_ int) {
+                       panic(fmt.Errorf("OpenDB failed: %w", permErr))
+               },
+       }
+
+       done := make(chan struct{})
+       var recovered interface{}
+       go func() {
+               defer close(done)
+               defer func() { recovered = recover() }()
+               r.initHandlerWithRetry("perm-handler", handler, 
[]schema.Kind{schema.KindGroup})
+       }()
+
+       select {
+       case <-done:
+       case <-time.After(2 * time.Second):
+               t.Fatal("initHandlerWithRetry did not return within 2s; 
permanent error path should fail fast")
+       }
+
+       require.NotNil(t, recovered, "permanent error must propagate as a 
panic")
+       recErr, isErr := recovered.(error)
+       require.True(t, isErr, "panic value should be an error wrapping the 
permanent error")
+       assert.True(t, initerror.IsPermanent(recErr), "recovered error must 
satisfy IsPermanent")
+       assert.Equal(t, int32(1), handler.calls.Load(), "permanent error must 
not be retried")
+       assert.Equal(t, mcStart, mc.Now(), "mock clock must not advance when 
the first attempt is permanent")
+}
+
+// TestInitHandlerWithRetry_TransientStillRetries asserts that a handler
+// panicking with a non-permanent error keeps retrying until either the 
deadline
+// passes (panic) or it stops panicking (success). Here we let the handler stop
+// panicking after two attempts to exercise the success path.
+func TestInitHandlerWithRetry_TransientStillRetries(t *testing.T) {
+       require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level: 
"warn"}))
+       mc := timestamp.NewMockClock()
+       r := newTestRegistryWithClock(t, mc)
+       handler := &retryTestHandler{
+               noPanicAfter:  2,
+               startedSignal: make(chan struct{}, 4),
+               panicFn: func(_ int) {
+                       panic("etcd unreachable")
+               },
+       }
+
+       done := make(chan struct{})
+       go func() {
+               defer close(done)
+               r.initHandlerWithRetry("transient-handler", handler, 
[]schema.Kind{schema.KindGroup})
+       }()
+
+       // Wait for the handler's first OnInit call to have run.
+       for i := 0; i < 2; i++ {
+               select {
+               case <-handler.startedSignal:
+               case <-time.After(2 * time.Second):
+                       t.Fatalf("handler.OnInit was not invoked for attempt %d 
within 2s", i+1)
+               }
+               // Advance the mock clock so the worker's Sleep returns and it 
can
+               // re-enter the retry loop. The benbjohnson MockClock fires 
sleepers
+               // when Add brings the time past their deadline, even if the 
sleeper
+               // registered after the Add — but we still poll briefly to 
avoid races.
+               require.Eventually(t, func() bool {
+                       mc.Add(time.Second)
+                       return handler.calls.Load() > int32(i+1) || done != nil 
&& isClosed(done)
+               }, 3*time.Second, 10*time.Millisecond, "Sleep must release 
after clock advance")
+       }
+
+       // Wait for the loop to finish (third call returns ok=true).
+       select {
+       case <-done:
+       case <-time.After(3 * time.Second):
+               t.Fatal("initHandlerWithRetry did not return after handler 
stopped panicking")
+       }
+       assert.GreaterOrEqual(t, handler.calls.Load(), int32(3), "handler must 
be retried at least twice before succeeding")
+}
+
+// TestInitHandlerWithRetry_DeadlineExceededPanics asserts that a non-permanent
+// failure that never stops eventually triggers r.l.Panic when the deadline is
+// crossed.
+func TestInitHandlerWithRetry_DeadlineExceededPanics(t *testing.T) {
+       require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level: 
"warn"}))
+       mc := timestamp.NewMockClock()
+       r := newTestRegistryWithClock(t, mc)
+       handler := &retryTestHandler{
+               startedSignal: make(chan struct{}, 8),
+               panicFn: func(_ int) {
+                       panic("never resolves")
+               },
+       }
+
+       done := make(chan struct{})
+       var recovered interface{}
+       go func() {
+               defer close(done)
+               defer func() { recovered = recover() }()
+               r.initHandlerWithRetry("deadline-handler", handler, 
[]schema.Kind{schema.KindGroup})
+       }()
+
+       deadline := time.Now().Add(5 * time.Second)
+       for {
+               select {
+               case <-done:
+                       require.NotNil(t, recovered, "deadline exceeded must 
propagate as a panic")
+                       return
+               case <-handler.startedSignal:
+               case <-time.After(50 * time.Millisecond):
+               }
+               if time.Now().After(deadline) {
+                       t.Fatal("initHandlerWithRetry did not exit after 
repeated clock advances")
+               }
+               mc.Add(2 * time.Minute)
+       }
+}
+
+func isClosed(ch <-chan struct{}) bool {
+       select {
+       case <-ch:
+               return true
+       default:
+               return false
+       }
+}
diff --git a/pkg/initerror/initerror.go b/pkg/initerror/initerror.go
new file mode 100644
index 000000000..6bff59384
--- /dev/null
+++ b/pkg/initerror/initerror.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package initerror exposes a Permanent error contract for initialization
+// failures that must not be retried. The package is a leaf: it imports nothing
+// project-internal so it can be referenced by both the storage layer (which
+// produces permanent errors) and the metadata-schema retry classifier (which
+// reads them) without introducing a layering edge between those packages.
+package initerror
+
+import "errors"
+
+// Permanent marks an error as a permanent (non-retriable) initialization 
failure.
+type Permanent interface {
+       error
+       Permanent() bool
+}
+
+type permanentError struct {
+       err error
+}
+
+// Error implements error.
+func (p *permanentError) Error() string { return p.err.Error() }
+
+// Unwrap exposes the wrapped error so errors.Is/As can traverse the chain.
+func (p *permanentError) Unwrap() error { return p.err }
+
+// Permanent reports that this error must not be retried.
+func (p *permanentError) Permanent() bool { return true }
+
+// AsPermanent wraps err as a Permanent error. Returns nil if err is nil.
+func AsPermanent(err error) error {
+       if err == nil {
+               return nil
+       }
+       return &permanentError{err: err}
+}
+
+// IsPermanent reports whether err's chain contains a Permanent error.
+func IsPermanent(err error) bool {
+       if err == nil {
+               return false
+       }
+       var p Permanent
+       return errors.As(err, &p) && p.Permanent()
+}
diff --git a/pkg/initerror/initerror_test.go b/pkg/initerror/initerror_test.go
new file mode 100644
index 000000000..2c16db32f
--- /dev/null
+++ b/pkg/initerror/initerror_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package initerror_test
+
+import (
+       "errors"
+       "fmt"
+       "testing"
+
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
+)
+
+var errSentinel = errors.New("sentinel")
+
+func TestAsPermanent_NilReturnsNil(t *testing.T) {
+       if got := initerror.AsPermanent(nil); got != nil {
+               t.Fatalf("AsPermanent(nil) = %v, want nil", got)
+       }
+}
+
+func TestIsPermanent_Nil(t *testing.T) {
+       if initerror.IsPermanent(nil) {
+               t.Fatalf("IsPermanent(nil) = true, want false")
+       }
+}
+
+func TestIsPermanent_PlainError(t *testing.T) {
+       if initerror.IsPermanent(errSentinel) {
+               t.Fatalf("IsPermanent(plain error) = true, want false")
+       }
+}
+
+func TestIsPermanent_Direct(t *testing.T) {
+       wrapped := initerror.AsPermanent(errSentinel)
+       if !initerror.IsPermanent(wrapped) {
+               t.Fatalf("IsPermanent(AsPermanent(...)) = false, want true")
+       }
+}
+
+func TestIsPermanent_FmtErrorfChain(t *testing.T) {
+       wrapped := fmt.Errorf("init failed: %w", 
initerror.AsPermanent(errSentinel))
+       if !initerror.IsPermanent(wrapped) {
+               t.Fatalf("IsPermanent through fmt.Errorf chain = false, want 
true")
+       }
+       if !errors.Is(wrapped, errSentinel) {
+               t.Fatalf("errors.Is should still see the inner sentinel")
+       }
+}
+
+func TestIsPermanent_DoubleWrap(t *testing.T) {
+       inner := initerror.AsPermanent(errSentinel)
+       outer := fmt.Errorf("outer: %w", fmt.Errorf("middle: %w", inner))
+       if !initerror.IsPermanent(outer) {
+               t.Fatalf("IsPermanent through double wrap = false, want true")
+       }
+}
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index 5d2c28400..95b8f88b1 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -35,11 +35,15 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/cgroups"
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-var _ Resource = (*resourceSpec)(nil)
+var (
+       _ Resource           = (*resourceSpec)(nil)
+       _ RevisionRepository = (*schemaRepo)(nil)
+)
 
 type resourceSpec struct {
        schema    ResourceSchema
@@ -87,20 +91,127 @@ type schemaRepo struct {
        indexRuleMap           sync.Map
        bindingForwardMap      sync.Map
        bindingBackwardMap     sync.Map
+       latestModRevision      atomic.Int64
        workerNum              int
        resourceMutex          sync.Mutex
        groupMux               sync.Mutex
 }
 
+// SendMetadataEvent applies the given event synchronously. When the downstream
+// caches (e.g. pkg/schema.schemaRepo.groupMap and resourceMap) must be 
coherent
+// before the caller returns — for example, when SchemaBarrierService delegates
+// through notifyHandlers → this handler → SendMetadataEvent — the caller 
relies
+// on the event being fully applied here. Only transient errors queue the event
+// for retry via the background worker; the success path does not touch the
+// channel.
 func (sr *schemaRepo) SendMetadataEvent(event MetadataEvent) {
        if !sr.closer.AddSender() {
                return
        }
        defer sr.closer.SenderDone()
-       select {
-       case sr.eventCh <- event:
-       case <-sr.closer.CloseNotify():
+       // SendMetadataEvent runs synchronously on the caller goroutine (e.g. an
+       // etcd-watch handler such as (*schemaRepo).OnAddOrUpdate). A panic from
+       // processEvent — string-typed today, error-typed once Step 1b lands —
+       // would otherwise propagate to a caller that has no recover() of its 
own.
+       // The classifier branch escalates permanent errors to .Fatal() so
+       // runtime registration of an incompatible group exits the process the
+       // same way the boot path does; non-permanent panics are logged and
+       // requeued onto eventCh so the Watcher worker can retry, mirroring the
+       // transient error-return path below.
+       defer func() {
+               if recovered := recover(); recovered != nil {
+                       if recErr, isErr := recovered.(error); isErr && 
initerror.IsPermanent(recErr) {
+                               sr.l.Fatal().Err(recErr).Interface("event", 
event).Str("stack", string(debug.Stack())).
+                                       Msg("SendMetadataEvent hit a permanent 
error, refusing to continue")
+                       }
+                       sr.l.Warn().Interface("err", 
recovered).Interface("event", event).Str("stack", string(debug.Stack())).
+                               Msg("recovered from SendMetadataEvent panic; 
requeueing")
+                       sr.metrics.totalErrs.Inc(1)
+                       select {
+                       case sr.eventCh <- event:
+                       case <-sr.closer.CloseNotify():
+                       }
+               }
+       }()
+       if err := sr.processEvent(event); err != nil && !errors.Is(err, 
schema.ErrClosed) {
+               if initerror.IsPermanent(err) {
+                       sr.l.Fatal().Err(err).Interface("event", event).
+                               Msg("SendMetadataEvent hit a permanent error, 
refusing to continue")
+               }
+               select {
+               case <-sr.closer.CloseNotify():
+                       return
+               default:
+               }
+               sr.l.Err(err).Interface("event", event).Msg("fail to handle the 
metadata event. retry...")
+               sr.metrics.totalErrs.Inc(1)
+               select {
+               case sr.eventCh <- event:
+               case <-sr.closer.CloseNotify():
+               }
+       }
+}
+
+// processEvent applies the event in-place, bumping latestModRevision on 
success
+// so the barrier's downstream-handler watermark catches up before 
notifyHandlers
+// returns to its caller.
+func (sr *schemaRepo) processEvent(evt MetadataEvent) error {
+       if e := sr.l.Debug(); e.Enabled() {
+               e.Interface("event", evt).Msg("received an event")
+       }
+       var err error
+       switch evt.Typ {
+       case EventAddOrUpdate:
+               switch evt.Kind {
+               case EventKindGroup:
+                       _, err = sr.storeGroup(evt.Metadata.GetMetadata())
+                       if errors.Is(err, schema.ErrGRPCResourceNotFound) {
+                               err = nil
+                       }
+               case EventKindResource:
+                       err = sr.storeResource(evt.Metadata)
+               case EventKindIndexRule:
+                       indexRule := evt.Metadata.(*databasev1.IndexRule)
+                       sr.storeIndexRule(indexRule)
+               case EventKindIndexRuleBinding:
+                       indexRuleBinding := 
evt.Metadata.(*databasev1.IndexRuleBinding)
+                       sr.storeIndexRuleBinding(indexRuleBinding)
+               }
+       case EventDelete:
+               switch evt.Kind {
+               case EventKindGroup:
+                       err = sr.deleteGroup(evt.Metadata.GetMetadata())
+               case EventKindResource:
+                       sr.deleteResource(evt)
+               case EventKindIndexRule:
+                       key := getKey(evt.Metadata.GetMetadata())
+                       sr.indexRuleMap.Delete(key)
+               case EventKindIndexRuleBinding:
+                       indexRuleBinding := 
evt.Metadata.(*databasev1.IndexRuleBinding)
+                       col, _ := 
sr.bindingForwardMap.Load(getKey(&commonv1.Metadata{
+                               Name:  indexRuleBinding.Subject.GetName(),
+                               Group: 
indexRuleBinding.GetMetadata().GetGroup(),
+                       }))
+                       if col == nil {
+                               break
+                       }
+                       tMap := col.(*sync.Map)
+                       key := getKey(indexRuleBinding.GetMetadata())
+                       tMap.Delete(key)
+                       for i := range indexRuleBinding.Rules {
+                               col, _ := 
sr.bindingBackwardMap.Load(getKey(&commonv1.Metadata{
+                                       Name:  indexRuleBinding.Rules[i],
+                                       Group: 
indexRuleBinding.GetMetadata().GetGroup(),
+                               }))
+                               if col == nil {
+                                       continue
+                               }
+                               tMap := col.(*sync.Map)
+                               tMap.Delete(key)
+                       }
+               }
        }
+       return err
 }
 
 // StopCh implements Repository.
@@ -155,89 +266,49 @@ func (sr *schemaRepo) Watcher() {
                        }
                        defer func() {
                                sr.closer.ReceiverDone()
+                               // Bump the panic metric BEFORE the classifier 
branch: .Fatal()
+                               // calls os.Exit(1) and the deferred Inc would 
otherwise be
+                               // skipped on the permanent-error escalation 
path. Do NOT move
+                               // this line under the if-branch.
+                               sr.metrics.totalPanics.Inc(1)
                                if err := recover(); err != nil {
+                                       // Classifier is dormant on 
string-typed panics (today's
+                                       // Panicf sites outside this PR's scope 
still panic with
+                                       // strings); tsdb.go:176/242 are 
converted in Step 1b so
+                                       // permanent errors reaching this 
recover are error-typed.
+                                       // We use .Fatal() not panic() because 
this is a worker
+                                       // goroutine: panic() would only kill 
this goroutine and
+                                       // leave the Watcher partially alive.
+                                       if recErr, isErr := err.(error); isErr 
&& initerror.IsPermanent(recErr) {
+                                               
sr.l.Fatal().Err(recErr).Str("stack", string(debug.Stack())).
+                                                       Msg("Watcher hit a 
permanent error, refusing to continue")
+                                       }
                                        sr.l.Warn().Interface("err", 
err).Str("stack", string(debug.Stack())).Msg("watching the events")
                                }
-                               sr.metrics.totalPanics.Inc(1)
                        }()
+                       // The Watcher drains events retried via the channel 
after transient
+                       // processing errors. The fast path runs synchronously 
in SendMetadataEvent
+                       // so the barrier's downstream-handler watermark 
advances correctly.
                        for {
                                select {
                                case evt, more := <-sr.eventCh:
                                        if !more {
                                                return
                                        }
-                                       if e := sr.l.Debug(); e.Enabled() {
-                                               e.Interface("event", 
evt).Msg("received an event")
-                                       }
-                                       var err error
-                                       switch evt.Typ {
-                                       case EventAddOrUpdate:
-                                               switch evt.Kind {
-                                               case EventKindGroup:
-                                                       _, err = 
sr.storeGroup(evt.Metadata.GetMetadata())
-                                                       if errors.As(err, 
schema.ErrGRPCResourceNotFound) {
-                                                               err = nil
-                                                       }
-                                               case EventKindResource:
-                                                       err = 
sr.storeResource(evt.Metadata)
-                                               case EventKindIndexRule:
-                                                       indexRule := 
evt.Metadata.(*databasev1.IndexRule)
-                                                       if 
indexRule.GetMetadata().GetGroup() == "test-trace-group" {
-                                                               
sr.l.Info().Str("group", indexRule.GetMetadata().GetGroup()).Msg("index rule")
-                                                       }
-                                                       
sr.storeIndexRule(indexRule)
-                                               case EventKindIndexRuleBinding:
-                                                       indexRuleBinding := 
evt.Metadata.(*databasev1.IndexRuleBinding)
-                                                       if 
indexRuleBinding.GetMetadata().GetGroup() == "test-trace-group" {
-                                                               
sr.l.Info().Str("group", indexRuleBinding.GetMetadata().GetGroup()).Msg("index 
rule binding")
-                                                       }
-                                                       
sr.storeIndexRuleBinding(indexRuleBinding)
-                                               }
-                                       case EventDelete:
-                                               switch evt.Kind {
-                                               case EventKindGroup:
-                                                       err = 
sr.deleteGroup(evt.Metadata.GetMetadata())
-                                               case EventKindResource:
-                                                       
sr.deleteResource(evt.Metadata.GetMetadata())
-                                               case EventKindIndexRule:
-                                                       key := 
getKey(evt.Metadata.GetMetadata())
-                                                       
sr.indexRuleMap.Delete(key)
-                                               case EventKindIndexRuleBinding:
-                                                       indexRuleBinding := 
evt.Metadata.(*databasev1.IndexRuleBinding)
-                                                       col, _ := 
sr.bindingForwardMap.Load(getKey(&commonv1.Metadata{
-                                                               Name:  
indexRuleBinding.Subject.GetName(),
-                                                               Group: 
indexRuleBinding.GetMetadata().GetGroup(),
-                                                       }))
-                                                       if col == nil {
-                                                               break
-                                                       }
-                                                       tMap := col.(*sync.Map)
-                                                       key := 
getKey(indexRuleBinding.GetMetadata())
-                                                       tMap.Delete(key)
-                                                       for i := range 
indexRuleBinding.Rules {
-                                                               col, _ := 
sr.bindingBackwardMap.Load(getKey(&commonv1.Metadata{
-                                                                       Name:  
indexRuleBinding.Rules[i],
-                                                                       Group: 
indexRuleBinding.GetMetadata().GetGroup(),
-                                                               }))
-                                                               if col == nil {
-                                                                       continue
-                                                               }
-                                                               tMap := 
col.(*sync.Map)
-                                                               tMap.Delete(key)
-                                                       }
+                                       if retryErr := sr.processEvent(evt); 
retryErr != nil && !errors.Is(retryErr, schema.ErrClosed) {
+                                               if 
initerror.IsPermanent(retryErr) {
+                                                       
sr.l.Fatal().Err(retryErr).Interface("event", evt).
+                                                               Msg("Watcher 
hit a permanent error, refusing to continue")
                                                }
-                                       }
-                                       if err != nil && !errors.Is(err, 
schema.ErrClosed) {
                                                select {
                                                case <-sr.closer.CloseNotify():
                                                        return
                                                default:
                                                }
-                                               
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. 
retry...")
-                                               sr.metrics.totalErrs.Inc(1)
+                                               
sr.l.Err(retryErr).Interface("event", evt).Msg("retry processing failed, 
requeueing")
+                                               sr.metrics.totalRetries.Inc(1)
                                                go func() {
                                                        
sr.SendMetadataEvent(evt)
-                                                       
sr.metrics.totalRetries.Inc(1)
                                                }()
                                        }
                                case <-sr.closer.CloseNotify():
@@ -259,12 +330,18 @@ func (sr *schemaRepo) storeGroup(groupMeta 
*commonv1.Metadata) (*group, error) {
                if err := g.init(name); err != nil {
                        return nil, err
                }
+               if gs := g.GetSchema(); gs != nil {
+                       
sr.updateLatestModRevision(gs.GetMetadata().GetModRevision())
+               }
                return g, nil
        }
        if !g.isInit() {
                if err := g.init(name); err != nil {
                        return nil, err
                }
+               if gs := g.GetSchema(); gs != nil {
+                       
sr.updateLatestModRevision(gs.GetMetadata().GetModRevision())
+               }
                return g, nil
        }
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -278,6 +355,7 @@ func (sr *schemaRepo) storeGroup(groupMeta 
*commonv1.Metadata) (*group, error) {
                return g, nil
        }
        g.groupSchema.Store(groupSchema)
+       sr.updateLatestModRevision(groupSchema.GetMetadata().GetModRevision())
        if proto.Equal(groupSchema, prevGroupSchema) {
                return g, nil
        }
@@ -365,6 +443,7 @@ func (sr *schemaRepo) storeResource(resourceSchema 
ResourceSchema) error {
        sm.OnIndexUpdate(sr.indexRules(resourceSchema))
        resource.delegated = sm
        sr.resourceMap.Store(key, resource)
+       
sr.updateLatestModRevision(resourceSchema.GetMetadata().GetModRevision())
        return nil
 }
 
@@ -373,6 +452,7 @@ func (sr *schemaRepo) storeIndexRule(indexRule 
*databasev1.IndexRule) {
        if prev, loaded := sr.indexRuleMap.LoadOrStore(key, indexRule); loaded {
                if prev.(*databasev1.IndexRule).GetMetadata().ModRevision <= 
indexRule.GetMetadata().ModRevision {
                        sr.indexRuleMap.Store(key, indexRule)
+                       
sr.updateLatestModRevision(indexRule.GetMetadata().GetModRevision())
                        if col, _ := sr.bindingBackwardMap.Load(key); col != 
nil {
                                col.(*sync.Map).Range(func(_, value any) bool {
                                        
sr.updateIndex(value.(*databasev1.IndexRuleBinding))
@@ -381,6 +461,7 @@ func (sr *schemaRepo) storeIndexRule(indexRule 
*databasev1.IndexRule) {
                        }
                }
        } else {
+               
sr.updateLatestModRevision(indexRule.GetMetadata().GetModRevision())
                if col, _ := sr.bindingBackwardMap.Load(key); col != nil {
                        col.(*sync.Map).Range(func(_, value any) bool {
                                
sr.updateIndex(value.(*databasev1.IndexRuleBinding))
@@ -425,6 +506,7 @@ func (sr *schemaRepo) 
storeIndexRuleBinding(indexRuleBinding *databasev1.IndexRu
        if !changed {
                return
        }
+       
sr.updateLatestModRevision(indexRuleBinding.GetMetadata().GetModRevision())
        sr.updateIndex(indexRuleBinding)
 }
 
@@ -468,11 +550,74 @@ func getKey(metadata *commonv1.Metadata) string {
        return path.Join(metadata.GetGroup(), metadata.GetName())
 }
 
-func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) {
-       key := getKey(metadata)
+func (sr *schemaRepo) deleteResource(evt MetadataEvent) {
+       key := getKey(evt.Metadata.GetMetadata())
+       // Hold resourceMutex across the load/check/delete to keep the 
revision-guard atomic.
+       // storeResource takes the same lock, so a concurrent newer store 
cannot interleave
+       // between the staleness check and LoadAndDelete and have its entry 
wiped.
+       sr.resourceMutex.Lock()
+       defer sr.resourceMutex.Unlock()
+       if evt.DeleteRevision != 0 {
+               if v, ok := sr.resourceMap.Load(key); ok {
+                       stored := v.(*resourceSpec)
+                       if evt.DeleteRevision < stored.maxRevision() {
+                               return
+                       }
+               }
+       }
        _, _ = sr.resourceMap.LoadAndDelete(key)
 }
 
+func (sr *schemaRepo) updateLatestModRevision(incoming int64) {
+       for {
+               cur := sr.latestModRevision.Load()
+               if incoming <= cur {
+                       return
+               }
+               if sr.latestModRevision.CompareAndSwap(cur, incoming) {
+                       return
+               }
+       }
+}
+
+// LatestModRevision returns the highest mod_revision seen across all stored 
kinds.
+func (sr *schemaRepo) LatestModRevision() int64 {
+       return sr.latestModRevision.Load()
+}
+
+// ResourceRevision returns the mod_revision for a stored resource and whether 
it was found.
+func (sr *schemaRepo) ResourceRevision(kind schema.Kind, groupName, name 
string) (int64, bool) {
+       key := path.Join(groupName, name)
+       switch kind {
+       case schema.KindStream, schema.KindMeasure, schema.KindTrace:
+               if v, ok := sr.resourceMap.Load(key); ok {
+                       return v.(*resourceSpec).maxRevision(), true
+               }
+       case schema.KindIndexRule:
+               if v, ok := sr.indexRuleMap.Load(key); ok {
+                       return 
v.(*databasev1.IndexRule).GetMetadata().GetModRevision(), true
+               }
+       case schema.KindGroup:
+               if v, ok := sr.groupMap.Load(name); ok {
+                       grp := v.(*group)
+                       if gs := grp.GetSchema(); gs != nil {
+                               return gs.GetMetadata().GetModRevision(), true
+                       }
+               }
+       case schema.KindIndexRuleBinding, schema.KindTopNAggregation,
+               schema.KindNode, schema.KindProperty, schema.KindMask:
+               // schemaRepo only caches resources, index rules, and groups; 
other kinds
+               // (bindings, top-n aggregations, nodes, properties, masks) 
have no entry here.
+       }
+       return 0, false
+}
+
+// IsAbsent returns true when the given resource is not present in the local 
cache.
+func (sr *schemaRepo) IsAbsent(kind schema.Kind, groupName, name string) bool {
+       _, ok := sr.ResourceRevision(kind, groupName, name)
+       return !ok
+}
+
 func (sr *schemaRepo) Close() {
        defer func() {
                if err := recover(); err != nil {
@@ -539,8 +684,8 @@ func (g *group) init(name string) error {
 }
 
 func (g *group) initBySchema(groupSchema *commonv1.Group) error {
-       g.groupSchema.Store(groupSchema)
        if g.isPortable() {
+               g.groupSchema.Store(groupSchema)
                return nil
        }
        db, err := g.resourceSupplier.OpenDB(groupSchema)
@@ -548,7 +693,8 @@ func (g *group) initBySchema(groupSchema *commonv1.Group) 
error {
                return err
        }
        g.db.Store(db)
-       return err
+       g.groupSchema.Store(groupSchema)
+       return nil
 }
 
 func (g *group) isInit() bool {
@@ -574,7 +720,11 @@ func (g *group) close() (err error) {
        if !g.isInit() || g.isPortable() {
                return nil
        }
-       return multierr.Append(err, g.SupplyTSDB().Close())
+       tsdb := g.SupplyTSDB()
+       if tsdb == nil {
+               return nil
+       }
+       return multierr.Append(err, tsdb.Close())
 }
 
 func (g *group) drop() error {
diff --git a/pkg/schema/cache_test.go b/pkg/schema/cache_test.go
new file mode 100644
index 000000000..ebafce414
--- /dev/null
+++ b/pkg/schema/cache_test.go
@@ -0,0 +1,297 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+       "errors"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       pkglogger "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// testLogger returns a logger suitable for unit tests in this package.
+func testLogger() *pkglogger.Logger {
+       _ = pkglogger.Init(pkglogger.Logging{Env: "dev", Level: "warn"})
+       return pkglogger.GetLogger("test", "schema-cache")
+}
+
+// noopIndexListener satisfies IndexListener and discards all index updates.
+type noopIndexListener struct{}
+
+func (noopIndexListener) OnIndexUpdate(_ []*databasev1.IndexRule) {}
+
+// stubSupplier satisfies ResourceSchemaSupplier for tests that only need
+// storeResource to succeed without real DB or metadata interactions.
+type stubSupplier struct{}
+
+func (stubSupplier) ResourceSchema(_ *commonv1.Metadata) (ResourceSchema, 
error) {
+       return nil, nil
+}
+
+func (stubSupplier) OpenResource(_ Resource) (IndexListener, error) {
+       return noopIndexListener{}, nil
+}
+
+// newTestSchemaRepo creates a minimal schemaRepo suitable for unit tests.
+// The zero values of all sync.Map and atomic fields are ready to use.
+func newTestSchemaRepo() *schemaRepo {
+       return &schemaRepo{resourceSchemaSupplier: stubSupplier{}}
+}
+
+// buildMeasure returns a *databasev1.Measure with the specified name and 
mod_revision.
+// Group is fixed to "g" because every test in this file uses the same group; 
varying
+// it would only add noise without changing what's being asserted.
+func buildMeasure(name string, modRevision int64) *databasev1.Measure {
+       return &databasev1.Measure{
+               Metadata: &commonv1.Metadata{Group: "g", Name: name, 
ModRevision: modRevision},
+       }
+}
+
+// TestDeleteResource_RevisionGuard_RejectsStaleDelete verifies that an 
EventDelete
+// carrying a DeleteRevision lower than the stored resource's mod_revision is 
ignored,
+// leaving the cache entry intact (A7).
+func TestDeleteResource_RevisionGuard_RejectsStaleDelete(t *testing.T) {
+       sr := newTestSchemaRepo()
+       require.NoError(t, sr.storeResource(buildMeasure("m", 100)))
+
+       sr.deleteResource(MetadataEvent{
+               Metadata:       buildMeasure("m", 100),
+               DeleteRevision: 50,
+               Typ:            EventDelete,
+               Kind:           EventKindResource,
+       })
+
+       _, present := sr.resourceMap.Load("g/m")
+       assert.True(t, present, "stale delete (DeleteRevision=50 < stored 
rev=100) must not remove the entry")
+}
+
+// TestDeleteResource_RevisionGuard_AcceptsFreshDelete verifies that an 
EventDelete
+// carrying a DeleteRevision greater than the stored resource's mod_revision 
removes
+// the cache entry (A7).
+func TestDeleteResource_RevisionGuard_AcceptsFreshDelete(t *testing.T) {
+       sr := newTestSchemaRepo()
+       require.NoError(t, sr.storeResource(buildMeasure("m", 100)))
+
+       sr.deleteResource(MetadataEvent{
+               Metadata:       buildMeasure("m", 100),
+               DeleteRevision: 150,
+               Typ:            EventDelete,
+               Kind:           EventKindResource,
+       })
+
+       _, present := sr.resourceMap.Load("g/m")
+       assert.False(t, present, "fresh delete (DeleteRevision=150 >= stored 
rev=100) must remove the entry")
+}
+
+// TestLatestModRevision_Monotonic verifies that LatestModRevision advances 
monotonically
+// with ascending stores and does not regress when a lower revision is applied 
(A8).
+func TestLatestModRevision_Monotonic(t *testing.T) {
+       sr := newTestSchemaRepo()
+       assert.Equal(t, int64(0), sr.LatestModRevision(), "watermark must start 
at zero")
+
+       require.NoError(t, sr.storeResource(buildMeasure("m1", 10)))
+       assert.Equal(t, int64(10), sr.LatestModRevision())
+
+       require.NoError(t, sr.storeResource(buildMeasure("m2", 30)))
+       assert.Equal(t, int64(30), sr.LatestModRevision())
+
+       // A new key at a lower revision must not regress the watermark.
+       require.NoError(t, sr.storeResource(buildMeasure("m3", 20)))
+       assert.Equal(t, int64(30), sr.LatestModRevision(), "stale store 
(rev=20) must not lower the watermark from 30")
+}
+
+// TestLatestModRevision_AcrossKinds verifies that LatestModRevision tracks 
the global
+// maximum across Resource, IndexRule, and IndexRuleBinding stores (A8).
+func TestLatestModRevision_AcrossKinds(t *testing.T) {
+       sr := newTestSchemaRepo()
+
+       require.NoError(t, sr.storeResource(buildMeasure("m1", 10)))
+       assert.Equal(t, int64(10), sr.LatestModRevision())
+
+       sr.storeIndexRule(&databasev1.IndexRule{
+               Metadata: &commonv1.Metadata{Group: "g", Name: "idx1", 
ModRevision: 50},
+               Tags:     []string{"tag1"},
+               Type:     databasev1.IndexRule_TYPE_INVERTED,
+       })
+       assert.Equal(t, int64(50), sr.LatestModRevision(), "IndexRule store 
(rev=50) must advance the watermark")
+
+       sr.storeIndexRuleBinding(&databasev1.IndexRuleBinding{
+               Metadata: &commonv1.Metadata{Group: "g", Name: "irb1", 
ModRevision: 40},
+               Rules:    []string{"idx1"},
+               Subject:  &databasev1.Subject{Catalog: 
commonv1.Catalog_CATALOG_MEASURE, Name: "m1"},
+       })
+       assert.Equal(t, int64(50), sr.LatestModRevision(), "IndexRuleBinding at 
rev=40 must not lower the watermark from 50")
+}
+
+// TestResourceRevision_PresentAndAbsent verifies that ResourceRevision 
returns the
+// stored mod_revision and ok=true for a present resource, and (0, false) for 
an absent one (A8).
+func TestResourceRevision_PresentAndAbsent(t *testing.T) {
+       sr := newTestSchemaRepo()
+       require.NoError(t, sr.storeResource(buildMeasure("m", 77)))
+
+       rev, ok := sr.ResourceRevision(schema.KindMeasure, "g", "m")
+       assert.True(t, ok)
+       assert.Equal(t, int64(77), rev)
+
+       rev, ok = sr.ResourceRevision(schema.KindMeasure, "g", "missing")
+       assert.False(t, ok)
+       assert.Equal(t, int64(0), rev)
+}
+
+// TestIsAbsent_AfterDelete verifies that IsAbsent returns false while the 
resource
+// is cached and true after a valid delete removes it from the cache (A7, A8).
+func TestIsAbsent_AfterDelete(t *testing.T) {
+       sr := newTestSchemaRepo()
+       require.NoError(t, sr.storeResource(buildMeasure("m", 100)))
+
+       assert.False(t, sr.IsAbsent(schema.KindMeasure, "g", "m"), "resource 
must not be absent immediately after store")
+
+       sr.deleteResource(MetadataEvent{
+               Metadata:       buildMeasure("m", 100),
+               DeleteRevision: 100,
+               Typ:            EventDelete,
+               Kind:           EventKindResource,
+       })
+
+       assert.True(t, sr.IsAbsent(schema.KindMeasure, "g", "m"), "resource 
must be absent after a valid delete")
+}
+
+// fakeDB satisfies pkg/schema.DB for unit tests that only exercise group 
caching.
+type fakeDB struct{}
+
+func (fakeDB) Close() error                           { return nil }
+func (fakeDB) UpdateOptions(_ *commonv1.ResourceOpts) {}
+func (fakeDB) Drop() error                            { return nil }
+
+// fakeResourceSupplier is a programmable ResourceSupplier whose OpenDB returns
+// a configurable error or DB. openCalls counts invocations so retry tests can
+// assert that initBySchema actually re-attempted OpenDB.
+type fakeResourceSupplier struct {
+       openErr   error
+       openCalls int
+}
+
+func (s *fakeResourceSupplier) ResourceSchema(_ *commonv1.Metadata) 
(ResourceSchema, error) {
+       return nil, nil
+}
+
+func (s *fakeResourceSupplier) OpenResource(_ Resource) (IndexListener, error) 
{
+       return noopIndexListener{}, nil
+}
+
+func (s *fakeResourceSupplier) OpenDB(_ *commonv1.Group) (DB, error) {
+       s.openCalls++
+       if s.openErr != nil {
+               return nil, s.openErr
+       }
+       return fakeDB{}, nil
+}
+
+func buildGroup(name string) *commonv1.Group {
+       return &commonv1.Group{
+               Metadata: &commonv1.Metadata{Name: name},
+       }
+}
+
+// TestInitBySchema_FailedOpenDB_LeavesGroupUninit asserts that when OpenDB
+// fails, the group is left in an un-initialized state so a later retry can
+// reattempt OpenDB. Previously the schema was stored before OpenDB ran, which
+// caused isInit() to return true even though db was nil.
+func TestInitBySchema_FailedOpenDB_LeavesGroupUninit(t *testing.T) {
+       openErr := errors.New("boom")
+       supplier := &fakeResourceSupplier{openErr: openErr}
+       g := newGroup(nil, nil, supplier)
+
+       err := g.initBySchema(buildGroup("g1"))
+       require.Error(t, err)
+       assert.True(t, errors.Is(err, openErr))
+       assert.False(t, g.isInit(), "group must not be initialized after a 
failed OpenDB")
+       assert.Nil(t, g.GetSchema(), "groupSchema must not be stored after a 
failed OpenDB")
+       assert.Nil(t, g.SupplyTSDB(), "SupplyTSDB must be nil after a failed 
OpenDB")
+}
+
+// TestInitBySchema_PortableGroupOK asserts that a portable group (nil
+// resourceSupplier) stores the schema and is considered initialized without
+// invoking OpenDB.
+func TestInitBySchema_PortableGroupOK(t *testing.T) {
+       g := newGroup(nil, nil, nil)
+       require.NoError(t, g.initBySchema(buildGroup("portable")))
+       assert.True(t, g.isInit(), "portable group must be initialized after 
initBySchema")
+       assert.Nil(t, g.SupplyTSDB(), "portable group must have no underlying 
tsdb")
+}
+
+// TestInitBySchema_SuccessStoresSchema asserts the success path stores schema
+// after OpenDB returns.
+func TestInitBySchema_SuccessStoresSchema(t *testing.T) {
+       supplier := &fakeResourceSupplier{}
+       g := newGroup(nil, nil, supplier)
+       require.NoError(t, g.initBySchema(buildGroup("g1")))
+       assert.True(t, g.isInit())
+       assert.Equal(t, 1, supplier.openCalls)
+}
+
+// TestInitGroup_RetryAfterFailedOpenDB asserts that initGroup retries OpenDB 
on
+// the second pass when the first attempt left the group un-initialized — the
+// fix for the silent-fail bug where a half-broken group survived in the cache.
+func TestInitGroup_RetryAfterFailedOpenDB(t *testing.T) {
+       openErr := errors.New("transient")
+       supplier := &fakeResourceSupplier{openErr: openErr}
+       repo := &schemaRepo{
+               l:                testLogger(),
+               resourceSupplier: supplier,
+       }
+
+       gs := buildGroup("g1")
+       _, firstErr := repo.initGroup(gs)
+       require.Error(t, firstErr)
+       require.Equal(t, 1, supplier.openCalls)
+
+       cached, ok := repo.getGroup("g1")
+       require.True(t, ok)
+       require.False(t, cached.isInit())
+
+       supplier.openErr = nil
+       g, secondErr := repo.initGroup(gs)
+       require.NoError(t, secondErr)
+       require.NotNil(t, g)
+       assert.True(t, g.isInit())
+       assert.Equal(t, 2, supplier.openCalls, "OpenDB must be re-attempted 
when the cached group is not initialized")
+}
+
+// TestInitGroup_AlreadyInit_NoReopen asserts that an already-initialized group
+// is returned without reinvoking OpenDB.
+func TestInitGroup_AlreadyInit_NoReopen(t *testing.T) {
+       supplier := &fakeResourceSupplier{}
+       repo := &schemaRepo{
+               l:                testLogger(),
+               resourceSupplier: supplier,
+       }
+       gs := buildGroup("g1")
+       _, err := repo.initGroup(gs)
+       require.NoError(t, err)
+       require.Equal(t, 1, supplier.openCalls)
+       _, err = repo.initGroup(gs)
+       require.NoError(t, err)
+       assert.Equal(t, 1, supplier.openCalls, "already-initialized group must 
not re-invoke OpenDB")
+}
diff --git a/pkg/schema/cache_watcher_fatal_test.go 
b/pkg/schema/cache_watcher_fatal_test.go
new file mode 100644
index 000000000..3c66c5d28
--- /dev/null
+++ b/pkg/schema/cache_watcher_fatal_test.go
@@ -0,0 +1,256 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+       stderrors "errors"
+       "fmt"
+       "os"
+       "os/exec"
+       "strings"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
+       pkglogger "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// nopCounter is a meter.Counter that discards all increments. Used by the
+// fatal-fail subprocess tests so the worker's deferred Inc(1) does not
+// nil-panic in the child process.
+type nopCounter struct{}
+
+func (nopCounter) Inc(_ float64, _ ...string) {}
+func (nopCounter) Delete(_ ...string) bool    { return false }
+
+var _ meter.Counter = nopCounter{}
+
+const (
+       // fatalChildEnv signals to the test binary that it is the in-process
+       // child for a subprocess fast-fail assertion. Parents re-exec the
+       // running test binary with this env set; only the matching child
+       // branch executes — the parent does not run the child code path.
+       fatalChildEnv          = "BANYAND_TEST_F5_FATAL_CHILD"
+       fatalChildModeWatcher  = "watcher"
+       fatalChildModeSendSync = "send-sync"
+)
+
+// permanentPanicSupplier is a ResourceSchemaSupplier whose OpenResource panics
+// with an error-typed permanent error. It exercises the Watcher worker's
+// recover() classifier (Step 1) — the panic propagates up through processEvent
+// and is caught in the worker's deferred recover(), where IsPermanent 
classifies
+// it as fatal.
+type permanentPanicSupplier struct{}
+
+func (permanentPanicSupplier) ResourceSchema(_ *commonv1.Metadata) 
(ResourceSchema, error) {
+       return nil, nil
+}
+
+func (permanentPanicSupplier) OpenResource(_ Resource) (IndexListener, error) {
+       panic(initerror.AsPermanent(stderrors.New("incompatible version 
1.3.0")))
+}
+
+// permanentErrorSupplier is a ResourceSchemaSupplier whose OpenResource 
returns
+// a permanent error. It exercises the synchronous-return classifier in
+// SendMetadataEvent (Step 2b).
+type permanentErrorSupplier struct{}
+
+func (permanentErrorSupplier) ResourceSchema(_ *commonv1.Metadata) 
(ResourceSchema, error) {
+       return nil, nil
+}
+
+func (permanentErrorSupplier) OpenResource(_ Resource) (IndexListener, error) {
+       return nil, initerror.AsPermanent(stderrors.New("incompatible version 
1.3.0"))
+}
+
+// transientThenOKSupplier returns a non-permanent error on the first call and
+// nil on the second. It is used to verify the transient-retry path stays
+// functional after F5 (Test 5.6).
+type transientThenOKSupplier struct {
+       openCalls atomic.Int32
+}
+
+func (s *transientThenOKSupplier) ResourceSchema(_ *commonv1.Metadata) 
(ResourceSchema, error) {
+       return nil, nil
+}
+
+func (s *transientThenOKSupplier) OpenResource(_ Resource) (IndexListener, 
error) {
+       calls := s.openCalls.Add(1)
+       if calls == 1 {
+               return nil, stderrors.New("etcd hiccup")
+       }
+       return noopIndexListener{}, nil
+}
+
+// fatalChildLogger returns a logger whose .Fatal() output goes to stderr so
+// the parent test can assert on the exit message via CombinedOutput.
+func fatalChildLogger() *pkglogger.Logger {
+       _ = pkglogger.Init(pkglogger.Logging{Env: "dev", Level: "info"})
+       return pkglogger.GetLogger("test", "schema-cache-fatal")
+}
+
+// fatalChildMetrics returns a Metrics instance backed by no-op counters so
+// the worker's deferred Inc(1) call does not nil-panic in the child process.
+func fatalChildMetrics() *Metrics {
+       return &Metrics{
+               totalErrs:    nopCounter{},
+               totalRetries: nopCounter{},
+               totalPanics:  nopCounter{},
+       }
+}
+
+func runFatalChild(t *testing.T, mode string) string {
+       t.Helper()
+       // #nosec G204 — os.Args[0] is the test binary path, set by the Go test
+       // runner. The mode argument is one of two compile-time string constants
+       // declared above.
+       cmd := exec.Command(os.Args[0], "-test.run=^"+t.Name()+"$", "-test.v")
+       cmd.Env = append(os.Environ(), fatalChildEnv+"="+mode)
+       out, err := cmd.CombinedOutput()
+       require.Error(t, err, "child must exit non-zero; output:\n%s", 
string(out))
+       var exitErr *exec.ExitError
+       require.ErrorAs(t, err, &exitErr, "child must produce *exec.ExitError; 
output:\n%s", string(out))
+       require.NotZero(t, exitErr.ExitCode(), "child must exit with non-zero 
code; output:\n%s", string(out))
+       return string(out)
+}
+
+// TestWatcherHitPermanentError_FailsFast verifies that the Watcher worker's
+// recover() classifier escalates an error-typed permanent panic to .Fatal(),
+// which exits the process non-zero (Step 1).
+func TestWatcherHitPermanentError_FailsFast(t *testing.T) {
+       if os.Getenv(fatalChildEnv) == fatalChildModeWatcher {
+               runWatcherFatalChild()
+               return
+       }
+       out := runFatalChild(t, fatalChildModeWatcher)
+       if !strings.Contains(out, "Watcher hit a permanent error") {
+               t.Fatalf("expected stderr to mention 'Watcher hit a permanent 
error', got:\n%s", out)
+       }
+}
+
+// runWatcherFatalChild constructs a minimal schemaRepo whose OpenResource
+// panics with a permanent error, starts the Watcher, and pushes an event
+// through eventCh. The Watcher worker's recover() classifier should escalate
+// to .Fatal() within a few hundred milliseconds.
+func runWatcherFatalChild() {
+       repo := &schemaRepo{
+               l:                      fatalChildLogger(),
+               resourceSchemaSupplier: permanentPanicSupplier{},
+               eventCh:                make(chan MetadataEvent, 1),
+               workerNum:              1,
+               closer:                 run.NewChannelCloser(),
+               metrics:                fatalChildMetrics(),
+       }
+       repo.Watcher()
+       repo.eventCh <- MetadataEvent{
+               Typ:  EventAddOrUpdate,
+               Kind: EventKindResource,
+               Metadata: &databasev1.Measure{
+                       Metadata: &commonv1.Metadata{Group: "g", Name: "m", 
ModRevision: 1},
+               },
+       }
+       // Hold the goroutine alive long enough for the worker's defer to run.
+       // .Fatal() calls os.Exit(1), so this sleep should never complete.
+       time.Sleep(5 * time.Second)
+       fmt.Fprintln(os.Stderr, "child did not exit; classifier did not fire")
+       os.Exit(2)
+}
+
+// TestSendMetadataEventHitPermanentError_FailsFast verifies that the
+// synchronous SendMetadataEvent path escalates a permanent error returned
+// from processEvent to .Fatal() (Step 2b).
+func TestSendMetadataEventHitPermanentError_FailsFast(t *testing.T) {
+       if os.Getenv(fatalChildEnv) == fatalChildModeSendSync {
+               runSendSyncFatalChild()
+               return
+       }
+       out := runFatalChild(t, fatalChildModeSendSync)
+       if !strings.Contains(out, "SendMetadataEvent hit a permanent error") {
+               t.Fatalf("expected stderr to mention 'SendMetadataEvent hit a 
permanent error', got:\n%s", out)
+       }
+}
+
+// runSendSyncFatalChild calls SendMetadataEvent on a schemaRepo whose
+// OpenResource returns a permanent error. The synchronous classifier branch
+// at cache.go (Step 2b) should escalate to .Fatal() before SendMetadataEvent
+// returns.
+func runSendSyncFatalChild() {
+       repo := &schemaRepo{
+               l:                      fatalChildLogger(),
+               resourceSchemaSupplier: permanentErrorSupplier{},
+               eventCh:                make(chan MetadataEvent, 1),
+               workerNum:              1,
+               closer:                 run.NewChannelCloser(),
+               metrics:                fatalChildMetrics(),
+       }
+       repo.SendMetadataEvent(MetadataEvent{
+               Typ:  EventAddOrUpdate,
+               Kind: EventKindResource,
+               Metadata: &databasev1.Measure{
+                       Metadata: &commonv1.Metadata{Group: "g", Name: "m", 
ModRevision: 1},
+               },
+       })
+       fmt.Fprintln(os.Stderr, "child did not exit; classifier did not fire")
+       os.Exit(2)
+}
+
+// TestSendMetadataEvent_TransientErrorRequeues verifies that a non-permanent
+// error from processEvent is requeued via eventCh (the existing transient
+// path) and not escalated to .Fatal() (Test 5.6 — preserves transient retry).
+func TestSendMetadataEvent_TransientErrorRequeues(t *testing.T) {
+       supplier := &transientThenOKSupplier{}
+       repo := &schemaRepo{
+               l:                      testLogger(),
+               resourceSchemaSupplier: supplier,
+               eventCh:                make(chan MetadataEvent, 1),
+               workerNum:              1,
+               closer:                 run.NewChannelCloser(),
+               metrics:                fatalChildMetrics(),
+       }
+
+       evt := MetadataEvent{
+               Typ:  EventAddOrUpdate,
+               Kind: EventKindResource,
+               Metadata: &databasev1.Measure{
+                       Metadata: &commonv1.Metadata{Group: "g", Name: "m", 
ModRevision: 1},
+               },
+       }
+       repo.SendMetadataEvent(evt)
+
+       // First processEvent failed with a transient error; SendMetadataEvent 
should
+       // have pushed onto eventCh for the worker (or self) to retry.
+       select {
+       case requeued := <-repo.eventCh:
+               require.Equal(t, EventKindResource, requeued.Kind, "requeued 
event must be the original")
+       case <-time.After(2 * time.Second):
+               t.Fatalf("transient error must requeue the event onto eventCh; 
openCalls=%d", supplier.openCalls.Load())
+       }
+       require.EqualValues(t, 1, supplier.openCalls.Load(), "first 
SendMetadataEvent must call OpenResource exactly once")
+
+       // Second pass: drive the requeued event through processEvent directly 
to
+       // confirm OpenResource is reattempted and succeeds.
+       require.NoError(t, repo.processEvent(evt))
+       require.EqualValues(t, 2, supplier.openCalls.Load(), "second pass must 
reattempt OpenResource")
+}
diff --git a/pkg/schema/init.go b/pkg/schema/init.go
index 3f96d750b..e7b5a09fa 100644
--- a/pkg/schema/init.go
+++ b/pkg/schema/init.go
@@ -94,9 +94,9 @@ func (sr *schemaRepo) getCatalog(kind schema.Kind) 
commonv1.Catalog {
 }
 
 func (sr *schemaRepo) processGroup(ctx context.Context, g *commonv1.Group, 
catalog commonv1.Catalog) {
-       _, err := sr.initGroup(g)
-       if err != nil {
-               logger.Panicf("fails to init the group: %v", err)
+       if _, initErr := sr.initGroup(g); initErr != nil {
+               sr.l.Error().Err(initErr).Str("group", 
g.Metadata.GetName()).Msg("fails to init the group")
+               panic(fmt.Errorf("fails to init the group %s: %w", 
g.Metadata.GetName(), initErr))
        }
        sr.processRules(ctx, g.Metadata.GetName())
        sr.processBindings(ctx, g.Metadata.GetName())
@@ -216,6 +216,12 @@ func (sr *schemaRepo) initGroup(groupSchema 
*commonv1.Group) (*group, error) {
        defer sr.groupMux.Unlock()
        g, ok := sr.getGroup(groupSchema.Metadata.Name)
        if ok {
+               if g.isInit() {
+                       return g, nil
+               }
+               if reinitErr := g.initBySchema(groupSchema); reinitErr != nil {
+                       return nil, reinitErr
+               }
                return g, nil
        }
        sr.l.Info().Str("group", groupSchema.Metadata.Name).Msg("creating a 
tsdb")

Reply via email to