This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c401217f fix: fail fast on incompatible storage version (boot + 
runtime paths) (#1124)
5c401217f is described below

commit 5c401217fdb65facbe2ce86ff8a79318c1aa075e
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon May 11 09:22:04 2026 +0800

    fix: fail fast on incompatible storage version (boot + runtime paths) 
(#1124)
---
 CHANGES.md                                         |   1 +
 Makefile                                           |  25 +-
 banyand/internal/storage/tsdb.go                   |  21 +-
 banyand/internal/storage/tsdb_test.go              | 100 ++++++++
 banyand/internal/storage/version.go                |   5 +-
 banyand/internal/storage/version_test.go           |  29 +++
 banyand/metadata/schema/property/client.go         |  34 ++-
 .../metadata/schema/property/init_handler_test.go  | 209 +++++++++++++++++
 pkg/initerror/initerror.go                         |  61 +++++
 pkg/initerror/initerror_test.go                    |  71 ++++++
 pkg/schema/cache.go                                |  61 ++++-
 pkg/schema/cache_test.go                           | 127 ++++++++++
 pkg/schema/cache_watcher_fatal_test.go             | 256 +++++++++++++++++++++
 pkg/schema/init.go                                 |  12 +-
 14 files changed, 994 insertions(+), 18 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 546e8e1fb..c7dad3188 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -88,6 +88,7 @@ Release Notes.
 - Add `GroupLifecycleInfo.errors` to surface per-group collection failures 
from FODC `InspectAll` instead of silently dropping the affected node entry.
 - Fix `CollectDataInfo` and `CollectLiaisonInfo` not handling 
`CATALOG_PROPERTY` groups.
 - Fix lifecycle migration where the receiving node could create segments 
shorter than the configured `SegmentInterval`.
+- Fail fast on incompatible storage version at boot. Previously the server 
would start in a degraded `SERVING` state with affected groups un-loaded 
because the property schema-registry retry loop swallowed the 
version-incompatibility panic. Compatible versions are listed in 
`banyand/internal/storage/versions.yml`.
 
 ### Chores
 
diff --git a/Makefile b/Makefile
index aadd5a98c..950d73f82 100644
--- a/Makefile
+++ b/Makefile
@@ -105,8 +105,31 @@ load-test-barrier: ## Run the schema-barrier CP-6 SLO load 
harness (3 data nodes
 
 lint: TARGET=lint
 lint: PROJECTS:=api $(PROJECTS) pkg scripts/ci/check test
+lint: check-import-boundaries
 lint: default ## Run the linters on all projects
 
+# check-import-boundaries enforces the layering invariants documented in
+# pkg/initerror/initerror.go: the leaf permanent-error contract must not gain
+# project-internal dependencies, and the property schema-registry classifier
+# must not import the storage internals it classifies via the leaf interface.
+.PHONY: check-import-boundaries
+check-import-boundaries: ## Enforce import-boundary invariants for the 
version-incompat fix
+       @bad=0; \
+       if grep -rln 'banyand/internal/storage' 
banyand/metadata/schema/property/ 2>/dev/null; then \
+               echo "FAIL: banyand/metadata/schema/property/ must not import 
banyand/internal/storage"; \
+               bad=1; \
+       fi; \
+       if grep -rln 'banyand/internal/storage' pkg/schema/ 2>/dev/null; then \
+               echo "FAIL: pkg/schema/ must not import 
banyand/internal/storage"; \
+               bad=1; \
+       fi; \
+       if grep -rln 'github.com/apache/skywalking-banyandb/' 
pkg/initerror/*.go 2>/dev/null | grep -v _test.go; then \
+               echo "FAIL: pkg/initerror/ must remain a leaf with zero 
project-internal imports (test files excepted)"; \
+               bad=1; \
+       fi; \
+       if [ $$bad -ne 0 ]; then exit 1; fi; \
+       echo "import boundaries OK"
+
 ##@ Vendor update
 
 vendor-update: TARGET=vendor-update
@@ -231,7 +254,7 @@ release-push-candidate: ## Push release candidate
        ${PUSH_RELEASE_SCRIPTS}
        
 .PHONY: all $(PROJECTS) clean build  default nuke
-.PHONY: lint check tidy format pre-push generate-test-cases
+.PHONY: lint check tidy format pre-push generate-test-cases 
check-import-boundaries
 .PHONY: test test-race test-coverage test-ci test-docker
 .PHONY: license-check license-fix license-dep
 .PHONY: release release-binary release-source release-sign release-assembly
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 7e7993085..d60d7cf00 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -19,6 +19,7 @@ package storage
 
 import (
        "context"
+       "fmt"
        "path/filepath"
        "strings"
        "sync"
@@ -173,7 +174,7 @@ func (d *database[T, O]) Close() error {
        d.segmentController.close()
        d.lock.Close()
        if err := d.lfs.DeleteFile(d.lock.Path()); err != nil {
-               logger.Panicf("cannot delete lock file %s: %s", d.lock.Path(), 
err)
+               panic(fmt.Errorf("cannot delete lock file %s: %w", 
d.lock.Path(), err))
        }
        if d.metricsFactory != nil {
                d.metricsFactory.Close()
@@ -239,14 +240,30 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts 
TSDBOpts[T, O], cache
        lockPath := filepath.Join(opts.Location, lockFilename)
        lock, err := tsdbLfs.CreateLockFile(lockPath, FilePerm)
        if err != nil {
-               logger.Panicf("cannot create lock file %s: %s", lockPath, err)
+               panic(fmt.Errorf("cannot create lock file %s: %w", lockPath, 
err))
        }
        db.lock = lock
+       // Release the lock fd if any subsequent step in OpenTSDB returns an 
error.
+       // Otherwise the file descriptor leaks until process exit and a retry 
on the
+       // same data dir hits "resource temporarily unavailable" from the next
+       // CreateLockFile call. Ownership transfers to (*database).Close() only 
on
+       // success — released = true is set on the success return below.
+       released := false
+       defer func() {
+               if !released {
+                       _ = lock.Close()
+                       _ = tsdbLfs.DeleteFile(lockPath)
+               }
+       }()
        if err := db.segmentController.open(); err != nil {
                return nil, err
        }
        obsservice.MetricsCollector.Register(location, db.collect)
        db.disableRotation = opts.DisableRotation
+       // db (and its lock fd) is now owned by the caller; on rotation-task 
error
+       // the partially-initialized db is still returned so the caller can 
Close
+       // it and unregister the metrics collector.
+       released = true
        return db, db.startRotationTask()
 }
 
diff --git a/banyand/internal/storage/tsdb_test.go 
b/banyand/internal/storage/tsdb_test.go
index 6b786a9c5..4852b9bd6 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -19,6 +19,8 @@ package storage
 
 import (
        "context"
+       "errors"
+       "os"
        "path/filepath"
        "testing"
        "time"
@@ -28,6 +30,7 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/meter"
        "github.com/apache/skywalking-banyandb/pkg/test"
@@ -670,3 +673,100 @@ func TestCollectWithPartialClosedSegments(t *testing.T) {
        // Clean up
        require.NoError(t, tsdb.Close())
 }
+
+// newTestSegmentSkeleton seeds a minimal on-disk segment layout under dir with
+// the supplied storage version stamp. The layout matches readSegmentMeta's
+// "old format" (version-only file body), which is sufficient to exercise
+// checkVersion at TSDB open time.
+func newTestSegmentSkeleton(t *testing.T, dir, version string) {
+       t.Helper()
+       segDir := filepath.Join(dir, "seg-20240501")
+       require.NoError(t, os.MkdirAll(segDir, 0o755))
+       metaPath := filepath.Join(segDir, metadataFilename)
+       require.NoError(t, os.WriteFile(metaPath, []byte(version), 0o600))
+}
+
+// TestTSDBOpen_LockReleasedAfterFailedOpen reproduces the lock-file leak that
+// the F5 v2 lock-fix targets. Prior to the fix, OpenTSDB acquired the lock
+// file but never released it on the segmentController.open() error path, so
+// a second OpenTSDB call against the same directory failed with
+// "resource temporarily unavailable" from CreateLockFile. The fix adds a
+// defer-release with a `released` boolean flipped to true only on the
+// success path. This test seeds an incompatible segment to force the first
+// OpenTSDB to fail, then removes the segment and asserts the second
+// OpenTSDB succeeds — which is only possible if the lock fd was released.
+func TestTSDBOpen_LockReleasedAfterFailedOpen(t *testing.T) {
+       logger.Init(logger.Logging{Env: "dev", Level: flags.LogLevel})
+
+       dir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       newTestSegmentSkeleton(t, dir, "1.3.0")
+
+       opts := TSDBOpts[*MockTSTable, any]{
+               Location:        dir,
+               SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+               TTL:             IntervalRule{Unit: DAY, Num: 3},
+               ShardNum:        1,
+               TSTableCreator:  MockTSTableCreator,
+       }
+
+       ctx := context.Background()
+       mc := timestamp.NewMockClock()
+       ts, parseErr := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 
00:00:00", time.Local)
+       require.NoError(t, parseErr)
+       mc.Set(ts)
+       ctx = timestamp.SetClock(ctx, mc)
+
+       // First open fails on the incompatible segment. With the leak fix the
+       // defer in OpenTSDB releases the lock fd before returning the error.
+       serviceCache := NewServiceCache()
+       _, openErr := OpenTSDB(ctx, opts, serviceCache, group)
+       require.Error(t, openErr, "first OpenTSDB must fail on the incompatible 
segment")
+       require.True(t, initerror.IsPermanent(openErr), "first OpenTSDB error 
must be permanent")
+
+       // Remove the bad segment so the second open's segmentController scan 
succeeds.
+       require.NoError(t, os.RemoveAll(filepath.Join(dir, "seg-20240501")))
+
+       // Second open against the same dir must succeed — only possible if the
+       // lock fd from the first attempt was released. Without the fix this
+       // returns "resource temporarily unavailable" from CreateLockFile.
+       tsdb, reopenErr := OpenTSDB(ctx, opts, serviceCache, group)
+       require.NoError(t, reopenErr, "second OpenTSDB must succeed after the 
failed first attempt")
+       require.NotNil(t, tsdb)
+       require.NoError(t, tsdb.Close())
+}
+
+// TestTSDBOpen_RejectsIncompatibleSegment verifies that opening a TSDB whose
+// on-disk segment is stamped with an incompatible version surfaces a permanent
+// initialization error so the caller fails fast instead of silently dropping
+// the affected group.
+func TestTSDBOpen_RejectsIncompatibleSegment(t *testing.T) {
+       logger.Init(logger.Logging{Env: "dev", Level: flags.LogLevel})
+
+       dir, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       newTestSegmentSkeleton(t, dir, "1.3.0")
+
+       opts := TSDBOpts[*MockTSTable, any]{
+               Location:        dir,
+               SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+               TTL:             IntervalRule{Unit: DAY, Num: 3},
+               ShardNum:        1,
+               TSTableCreator:  MockTSTableCreator,
+       }
+
+       ctx := context.Background()
+       mc := timestamp.NewMockClock()
+       ts, parseErr := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 
00:00:00", time.Local)
+       require.NoError(t, parseErr)
+       mc.Set(ts)
+       ctx = timestamp.SetClock(ctx, mc)
+
+       serviceCache := NewServiceCache()
+       _, openErr := OpenTSDB(ctx, opts, serviceCache, group)
+       require.Error(t, openErr, "OpenTSDB must reject an incompatible-version 
segment")
+       require.True(t, initerror.IsPermanent(openErr), "incompatible-version 
error must surface as permanent")
+       require.True(t, errors.Is(openErr, errVersionIncompatible), "error must 
still match the version-incompatible sentinel")
+}
diff --git a/banyand/internal/storage/version.go 
b/banyand/internal/storage/version.go
index ddea79a33..82d1adaaf 100644
--- a/banyand/internal/storage/version.go
+++ b/banyand/internal/storage/version.go
@@ -24,6 +24,8 @@ import (
 
        "github.com/pkg/errors"
        "sigs.k8s.io/yaml"
+
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
 )
 
 const (
@@ -46,7 +48,8 @@ func checkVersion(version string) error {
                        return nil
                }
        }
-       return errors.WithMessagef(errVersionIncompatible, "incompatible 
version %s, supported versions: %s", version, strings.Join(compatibleVersions, 
", "))
+       return initerror.AsPermanent(errors.WithMessagef(errVersionIncompatible,
+               "incompatible version %s, supported versions: %s", version, 
strings.Join(compatibleVersions, ", ")))
 }
 
 type segmentMeta struct {
diff --git a/banyand/internal/storage/version_test.go 
b/banyand/internal/storage/version_test.go
index 726e6d8a2..31350d56d 100644
--- a/banyand/internal/storage/version_test.go
+++ b/banyand/internal/storage/version_test.go
@@ -18,10 +18,14 @@
 package storage
 
 import (
+       "errors"
+       "fmt"
        "testing"
 
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
 )
 
 func TestReadSegmentMeta_NewFormat(t *testing.T) {
@@ -52,6 +56,31 @@ func TestReadSegmentMeta_IncompatibleVersion(t *testing.T) {
        data := 
[]byte(`{"version":"0.1.0","endTime":"2026-04-07T00:00:00+08:00"}`)
        _, err := readSegmentMeta(data)
        assert.Error(t, err)
+       assert.True(t, initerror.IsPermanent(err), "incompatible version must 
surface as permanent")
+       assert.True(t, errors.Is(err, errVersionIncompatible), "incompatible 
version must still match the sentinel")
+}
+
+func TestCheckVersion_WrappedAsPermanent(t *testing.T) {
+       err := checkVersion("1.3.0")
+       require.Error(t, err)
+       assert.True(t, initerror.IsPermanent(err))
+       assert.True(t, errors.Is(err, errVersionIncompatible))
+
+       wrapped := fmt.Errorf("init: %w", err)
+       assert.True(t, initerror.IsPermanent(wrapped), "permanence must survive 
fmt.Errorf wrapping")
+       assert.True(t, errors.Is(wrapped, errVersionIncompatible))
+}
+
+func TestCheckVersion_CompatibleReturnsNil(t *testing.T) {
+       require.NoError(t, checkVersion(currentVersion))
+       for _, v := range compatibleVersions {
+               require.NoError(t, checkVersion(v))
+       }
+}
+
+func TestIsPermanent_UnrelatedErrorIsNotPermanent(t *testing.T) {
+       assert.False(t, initerror.IsPermanent(errors.New("plain")))
+       assert.False(t, initerror.IsPermanent(nil))
 }
 
 func TestReadSegmentMeta_NewFormatNoEndTime(t *testing.T) {
diff --git a/banyand/metadata/schema/property/client.go 
b/banyand/metadata/schema/property/client.go
index 88ec7360d..69c2d8fe1 100644
--- a/banyand/metadata/schema/property/client.go
+++ b/banyand/metadata/schema/property/client.go
@@ -41,6 +41,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -128,6 +129,7 @@ type SchemaRegistry struct {
        l                  *logger.Logger
        cache              *schemaCache
        caCertReloader     *pkgtls.Reloader
+       clock              timestamp.Clock
        handlers           map[schema.Kind][]schema.EventHandler
        watchSessions      map[string]*watchSession
        pauseQueue         []func()
@@ -205,6 +207,7 @@ func NewSchemaRegistryClient(cfg *ClientConfig) 
(*SchemaRegistry, error) {
                l:                  l,
                cache:              newSchemaCacheWithLimit(maxTombstones),
                caCertReloader:     caCertReloader,
+               clock:              timestamp.NewClock(),
                handlers:           make(map[schema.Kind][]schema.EventHandler),
                watchSessions:      make(map[string]*watchSession),
                syncInterval:       syncInterval,
@@ -1116,31 +1119,48 @@ func (r *SchemaRegistry) RegisterHandler(name string, 
kind schema.Kind, handler
 }
 
 func (r *SchemaRegistry) initHandlerWithRetry(name string, handler 
schema.EventHandler, kinds []schema.Kind) {
-       deadline := time.Now().Add(time.Minute)
+       deadline := r.clock.Now().Add(time.Minute)
        var lastErr interface{}
        for {
-               if tryCallOnInit(handler, kinds, &lastErr) {
+               ok, permanent := tryCallOnInit(handler, kinds, &lastErr)
+               if ok {
                        return
                }
-               if time.Now().After(deadline) {
-                       r.l.Panic().Str("name", name).Interface("error", 
lastErr).
+               if permanent {
+                       r.l.Error().Str("name", name).Str("error", 
fmt.Sprintf("%+v", lastErr)).
+                               Msg("OnInit hit a permanent error, refusing to 
start")
+                       if recErr, isErr := lastErr.(error); isErr {
+                               panic(fmt.Errorf("OnInit %s: %w", name, recErr))
+                       }
+                       panic(fmt.Errorf("OnInit %s: %v", name, lastErr))
+               }
+               if r.clock.Now().After(deadline) {
+                       r.l.Error().Str("name", name).Interface("error", 
lastErr).
                                Msg("handler OnInit failed after 1m, giving up")
+                       panic(fmt.Errorf("OnInit %s exceeded deadline: %v", 
name, lastErr))
                }
                r.l.Warn().Str("name", name).Interface("error", lastErr).
                        Msg("handler OnInit panicked due to transient error, 
retrying in 1s")
-               time.Sleep(time.Second)
+               r.clock.Sleep(time.Second)
        }
 }
 
-func tryCallOnInit(handler schema.EventHandler, kinds []schema.Kind, lastErr 
*interface{}) (ok bool) {
+// tryCallOnInit invokes handler.OnInit and recovers any panic. Do not wrap the
+// recovered value with multierr before classification; multierr.Append's
+// Unwrap() []error is traversed by errors.As, but extra wrapping changes which
+// concrete type is returned first and breaks IsPermanent's interface lookup.
+func tryCallOnInit(handler schema.EventHandler, kinds []schema.Kind, lastErr 
*interface{}) (ok, permanent bool) {
        defer func() {
                if rec := recover(); rec != nil {
                        *lastErr = rec
                        ok = false
+                       if recErr, isErr := rec.(error); isErr {
+                               permanent = initerror.IsPermanent(recErr)
+                       }
                }
        }()
        handler.OnInit(kinds)
-       return true
+       return true, false
 }
 
 // Start starts a single goroutine that periodically syncs schemas.
diff --git a/banyand/metadata/schema/property/init_handler_test.go 
b/banyand/metadata/schema/property/init_handler_test.go
new file mode 100644
index 000000000..4e6625892
--- /dev/null
+++ b/banyand/metadata/schema/property/init_handler_test.go
@@ -0,0 +1,209 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "errors"
+       "fmt"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// retryTestHandler implements schema.EventHandler for retry-classification 
tests.
+// Each OnInit call invokes the configured panicFn (which may panic). When
+// noPanicAfter calls have been made, OnInit returns normally so the retry loop
+// can succeed.
+type retryTestHandler struct {
+       panicFn       func(callIdx int)
+       startedSignal chan struct{}
+       schema.UnimplementedOnInitHandler
+       calls        atomic.Int32
+       noPanicAfter int32
+}
+
+func (h *retryTestHandler) OnAddOrUpdate(_ schema.Metadata) {}
+func (h *retryTestHandler) OnDelete(_ schema.Metadata)      {}
+
+func (h *retryTestHandler) OnInit(_ []schema.Kind) (bool, []int64) {
+       idx := h.calls.Add(1)
+       if h.startedSignal != nil {
+               select {
+               case h.startedSignal <- struct{}{}:
+               default:
+               }
+       }
+       if h.noPanicAfter > 0 && idx > h.noPanicAfter {
+               return true, nil
+       }
+       if h.panicFn != nil {
+               h.panicFn(int(idx))
+       }
+       return true, nil
+}
+
+// newTestRegistryWithClock builds a SchemaRegistry instance that bypasses
+// connection setup so we can drive its retry classifier in-process. It uses a
+// mock clock so the test controls the deadline and Sleep windows.
+func newTestRegistryWithClock(_ *testing.T, mc timestamp.MockClock) 
*SchemaRegistry {
+       l := logger.GetLogger("init-handler-retry-test")
+       return &SchemaRegistry{
+               l:     l,
+               clock: mc,
+       }
+}
+
+// TestInitHandlerWithRetry_PermanentVersionError_FailsFast asserts that a
+// handler panicking with a Permanent-wrapped error causes the retry loop to
+// exit on the first attempt via r.l.Panic, without ever advancing the clock.
+func TestInitHandlerWithRetry_PermanentVersionError_FailsFast(t *testing.T) {
+       require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level: 
"warn"}))
+       mc := timestamp.NewMockClock()
+       mcStart := mc.Now()
+       r := newTestRegistryWithClock(t, mc)
+       permErr := initerror.AsPermanent(errors.New("incompatible version 
1.3.0"))
+       handler := &retryTestHandler{
+               panicFn: func(_ int) {
+                       panic(fmt.Errorf("OpenDB failed: %w", permErr))
+               },
+       }
+
+       done := make(chan struct{})
+       var recovered interface{}
+       go func() {
+               defer close(done)
+               defer func() { recovered = recover() }()
+               r.initHandlerWithRetry("perm-handler", handler, 
[]schema.Kind{schema.KindGroup})
+       }()
+
+       select {
+       case <-done:
+       case <-time.After(2 * time.Second):
+               t.Fatal("initHandlerWithRetry did not return within 2s; 
permanent error path should fail fast")
+       }
+
+       require.NotNil(t, recovered, "permanent error must propagate as a 
panic")
+       recErr, isErr := recovered.(error)
+       require.True(t, isErr, "panic value should be an error wrapping the 
permanent error")
+       assert.True(t, initerror.IsPermanent(recErr), "recovered error must 
satisfy IsPermanent")
+       assert.Equal(t, int32(1), handler.calls.Load(), "permanent error must 
not be retried")
+       assert.Equal(t, mcStart, mc.Now(), "mock clock must not advance when 
the first attempt is permanent")
+}
+
+// TestInitHandlerWithRetry_TransientStillRetries asserts that a handler
+// panicking with a non-permanent error keeps retrying until either the 
deadline
+// passes (panic) or it stops panicking (success). Here we let the handler stop
+// panicking after two attempts to exercise the success path.
+func TestInitHandlerWithRetry_TransientStillRetries(t *testing.T) {
+       require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level: 
"warn"}))
+       mc := timestamp.NewMockClock()
+       r := newTestRegistryWithClock(t, mc)
+       handler := &retryTestHandler{
+               noPanicAfter:  2,
+               startedSignal: make(chan struct{}, 4),
+               panicFn: func(_ int) {
+                       panic("etcd unreachable")
+               },
+       }
+
+       done := make(chan struct{})
+       go func() {
+               defer close(done)
+               r.initHandlerWithRetry("transient-handler", handler, 
[]schema.Kind{schema.KindGroup})
+       }()
+
+       // Wait for the handler's first OnInit call to have run.
+       for i := 0; i < 2; i++ {
+               select {
+               case <-handler.startedSignal:
+               case <-time.After(2 * time.Second):
+                       t.Fatalf("handler.OnInit was not invoked for attempt %d 
within 2s", i+1)
+               }
+               // Advance the mock clock so the worker's Sleep returns and it 
can
+               // re-enter the retry loop. The benbjohnson MockClock fires 
sleepers
+               // when Add brings the time past their deadline, even if the 
sleeper
+               // registered after the Add — but we still poll briefly to 
avoid races.
+               require.Eventually(t, func() bool {
+                       mc.Add(time.Second)
+                       return handler.calls.Load() > int32(i+1) || done != nil 
&& isClosed(done)
+               }, 3*time.Second, 10*time.Millisecond, "Sleep must release 
after clock advance")
+       }
+
+       // Wait for the loop to finish (third call returns ok=true).
+       select {
+       case <-done:
+       case <-time.After(3 * time.Second):
+               t.Fatal("initHandlerWithRetry did not return after handler 
stopped panicking")
+       }
+       assert.GreaterOrEqual(t, handler.calls.Load(), int32(3), "handler must 
be retried at least twice before succeeding")
+}
+
+// TestInitHandlerWithRetry_DeadlineExceededPanics asserts that a non-permanent
+// failure that never stops eventually triggers r.l.Panic when the deadline is
+// crossed.
+func TestInitHandlerWithRetry_DeadlineExceededPanics(t *testing.T) {
+       require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level: 
"warn"}))
+       mc := timestamp.NewMockClock()
+       r := newTestRegistryWithClock(t, mc)
+       handler := &retryTestHandler{
+               startedSignal: make(chan struct{}, 8),
+               panicFn: func(_ int) {
+                       panic("never resolves")
+               },
+       }
+
+       done := make(chan struct{})
+       var recovered interface{}
+       go func() {
+               defer close(done)
+               defer func() { recovered = recover() }()
+               r.initHandlerWithRetry("deadline-handler", handler, 
[]schema.Kind{schema.KindGroup})
+       }()
+
+       deadline := time.Now().Add(5 * time.Second)
+       for {
+               select {
+               case <-done:
+                       require.NotNil(t, recovered, "deadline exceeded must 
propagate as a panic")
+                       return
+               case <-handler.startedSignal:
+               case <-time.After(50 * time.Millisecond):
+               }
+               if time.Now().After(deadline) {
+                       t.Fatal("initHandlerWithRetry did not exit after 
repeated clock advances")
+               }
+               mc.Add(2 * time.Minute)
+       }
+}
+
+func isClosed(ch <-chan struct{}) bool {
+       select {
+       case <-ch:
+               return true
+       default:
+               return false
+       }
+}
diff --git a/pkg/initerror/initerror.go b/pkg/initerror/initerror.go
new file mode 100644
index 000000000..6bff59384
--- /dev/null
+++ b/pkg/initerror/initerror.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package initerror exposes a Permanent error contract for initialization
+// failures that must not be retried. The package is a leaf: it imports nothing
+// project-internal so it can be referenced by both the storage layer (which
+// produces permanent errors) and the metadata-schema retry classifier (which
+// reads them) without introducing a layering edge between those packages.
+package initerror
+
+import "errors"
+
+// Permanent marks an error as a permanent (non-retriable) initialization 
failure.
+type Permanent interface {
+       error
+       Permanent() bool
+}
+
+type permanentError struct {
+       err error
+}
+
+// Error implements error.
+func (p *permanentError) Error() string { return p.err.Error() }
+
+// Unwrap exposes the wrapped error so errors.Is/As can traverse the chain.
+func (p *permanentError) Unwrap() error { return p.err }
+
+// Permanent reports that this error must not be retried.
+func (p *permanentError) Permanent() bool { return true }
+
+// AsPermanent wraps err as a Permanent error. Returns nil if err is nil.
+func AsPermanent(err error) error {
+       if err == nil {
+               return nil
+       }
+       return &permanentError{err: err}
+}
+
+// IsPermanent reports whether err's chain contains a Permanent error.
+func IsPermanent(err error) bool {
+       if err == nil {
+               return false
+       }
+       var p Permanent
+       return errors.As(err, &p) && p.Permanent()
+}
diff --git a/pkg/initerror/initerror_test.go b/pkg/initerror/initerror_test.go
new file mode 100644
index 000000000..2c16db32f
--- /dev/null
+++ b/pkg/initerror/initerror_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package initerror_test
+
+import (
+       "errors"
+       "fmt"
+       "testing"
+
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
+)
+
+var errSentinel = errors.New("sentinel")
+
+func TestAsPermanent_NilReturnsNil(t *testing.T) {
+       if got := initerror.AsPermanent(nil); got != nil {
+               t.Fatalf("AsPermanent(nil) = %v, want nil", got)
+       }
+}
+
+func TestIsPermanent_Nil(t *testing.T) {
+       if initerror.IsPermanent(nil) {
+               t.Fatalf("IsPermanent(nil) = true, want false")
+       }
+}
+
+func TestIsPermanent_PlainError(t *testing.T) {
+       if initerror.IsPermanent(errSentinel) {
+               t.Fatalf("IsPermanent(plain error) = true, want false")
+       }
+}
+
+func TestIsPermanent_Direct(t *testing.T) {
+       wrapped := initerror.AsPermanent(errSentinel)
+       if !initerror.IsPermanent(wrapped) {
+               t.Fatalf("IsPermanent(AsPermanent(...)) = false, want true")
+       }
+}
+
+func TestIsPermanent_FmtErrorfChain(t *testing.T) {
+       wrapped := fmt.Errorf("init failed: %w", 
initerror.AsPermanent(errSentinel))
+       if !initerror.IsPermanent(wrapped) {
+               t.Fatalf("IsPermanent through fmt.Errorf chain = false, want 
true")
+       }
+       if !errors.Is(wrapped, errSentinel) {
+               t.Fatalf("errors.Is should still see the inner sentinel")
+       }
+}
+
+func TestIsPermanent_DoubleWrap(t *testing.T) {
+       inner := initerror.AsPermanent(errSentinel)
+       outer := fmt.Errorf("outer: %w", fmt.Errorf("middle: %w", inner))
+       if !initerror.IsPermanent(outer) {
+               t.Fatalf("IsPermanent through double wrap = false, want true")
+       }
+}
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index ec45cda19..95b8f88b1 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -35,6 +35,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/cgroups"
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -108,7 +109,35 @@ func (sr *schemaRepo) SendMetadataEvent(event 
MetadataEvent) {
                return
        }
        defer sr.closer.SenderDone()
+       // SendMetadataEvent runs synchronously on the caller goroutine (e.g. an
+       // etcd-watch handler such as (*schemaRepo).OnAddOrUpdate). A panic from
+       // processEvent — string-typed today, error-typed once Step 1b lands —
+       // would otherwise propagate to a caller that has no recover() of its 
own.
+       // The classifier branch escalates permanent errors to .Fatal() so
+       // runtime registration of an incompatible group exits the process the
+       // same way the boot path does; non-permanent panics are logged and
+       // requeued onto eventCh so the Watcher worker can retry, mirroring the
+       // transient error-return path below.
+       defer func() {
+               if recovered := recover(); recovered != nil {
+                       if recErr, isErr := recovered.(error); isErr && 
initerror.IsPermanent(recErr) {
+                               sr.l.Fatal().Err(recErr).Interface("event", 
event).Str("stack", string(debug.Stack())).
+                                       Msg("SendMetadataEvent hit a permanent 
error, refusing to continue")
+                       }
+                       sr.l.Warn().Interface("err", 
recovered).Interface("event", event).Str("stack", string(debug.Stack())).
+                               Msg("recovered from SendMetadataEvent panic; 
requeueing")
+                       sr.metrics.totalErrs.Inc(1)
+                       select {
+                       case sr.eventCh <- event:
+                       case <-sr.closer.CloseNotify():
+                       }
+               }
+       }()
        if err := sr.processEvent(event); err != nil && !errors.Is(err, 
schema.ErrClosed) {
+               if initerror.IsPermanent(err) {
+                       sr.l.Fatal().Err(err).Interface("event", event).
+                               Msg("SendMetadataEvent hit a permanent error, 
refusing to continue")
+               }
                select {
                case <-sr.closer.CloseNotify():
                        return
@@ -237,10 +266,25 @@ func (sr *schemaRepo) Watcher() {
                        }
                        defer func() {
                                sr.closer.ReceiverDone()
+                               // Bump the panic metric BEFORE the classifier 
branch: .Fatal()
+                               // calls os.Exit(1) and the deferred Inc would 
otherwise be
+                               // skipped on the permanent-error escalation 
path. Do NOT move
+                               // this line under the if-branch.
+                               sr.metrics.totalPanics.Inc(1)
                                if err := recover(); err != nil {
+                                       // Classifier is dormant on 
string-typed panics (today's
+                                       // Panicf sites outside this PR's scope 
still panic with
+                                       // strings); tsdb.go:176/242 are 
converted in Step 1b so
+                                       // permanent errors reaching this 
recover are error-typed.
+                                       // We use .Fatal() not panic() because 
this is a worker
+                                       // goroutine: panic() would only kill 
this goroutine and
+                                       // leave the Watcher partially alive.
+                                       if recErr, isErr := err.(error); isErr 
&& initerror.IsPermanent(recErr) {
+                                               
sr.l.Fatal().Err(recErr).Str("stack", string(debug.Stack())).
+                                                       Msg("Watcher hit a 
permanent error, refusing to continue")
+                                       }
                                        sr.l.Warn().Interface("err", 
err).Str("stack", string(debug.Stack())).Msg("watching the events")
                                }
-                               sr.metrics.totalPanics.Inc(1)
                        }()
                        // The Watcher drains events retried via the channel 
after transient
                        // processing errors. The fast path runs synchronously 
in SendMetadataEvent
@@ -252,6 +296,10 @@ func (sr *schemaRepo) Watcher() {
                                                return
                                        }
                                        if retryErr := sr.processEvent(evt); 
retryErr != nil && !errors.Is(retryErr, schema.ErrClosed) {
+                                               if 
initerror.IsPermanent(retryErr) {
+                                                       
sr.l.Fatal().Err(retryErr).Interface("event", evt).
+                                                               Msg("Watcher 
hit a permanent error, refusing to continue")
+                                               }
                                                select {
                                                case <-sr.closer.CloseNotify():
                                                        return
@@ -636,8 +684,8 @@ func (g *group) init(name string) error {
 }
 
 func (g *group) initBySchema(groupSchema *commonv1.Group) error {
-       g.groupSchema.Store(groupSchema)
        if g.isPortable() {
+               g.groupSchema.Store(groupSchema)
                return nil
        }
        db, err := g.resourceSupplier.OpenDB(groupSchema)
@@ -645,7 +693,8 @@ func (g *group) initBySchema(groupSchema *commonv1.Group) 
error {
                return err
        }
        g.db.Store(db)
-       return err
+       g.groupSchema.Store(groupSchema)
+       return nil
 }
 
 func (g *group) isInit() bool {
@@ -671,7 +720,11 @@ func (g *group) close() (err error) {
        if !g.isInit() || g.isPortable() {
                return nil
        }
-       return multierr.Append(err, g.SupplyTSDB().Close())
+       tsdb := g.SupplyTSDB()
+       if tsdb == nil {
+               return nil
+       }
+       return multierr.Append(err, tsdb.Close())
 }
 
 func (g *group) drop() error {
diff --git a/pkg/schema/cache_test.go b/pkg/schema/cache_test.go
index d524e238b..ebafce414 100644
--- a/pkg/schema/cache_test.go
+++ b/pkg/schema/cache_test.go
@@ -18,6 +18,7 @@
 package schema
 
 import (
+       "errors"
        "testing"
 
        "github.com/stretchr/testify/assert"
@@ -26,8 +27,15 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       pkglogger "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
+// testLogger returns a logger suitable for unit tests in this package.
+func testLogger() *pkglogger.Logger {
+       _ = pkglogger.Init(pkglogger.Logging{Env: "dev", Level: "warn"})
+       return pkglogger.GetLogger("test", "schema-cache")
+}
+
 // noopIndexListener satisfies IndexListener and discards all index updates.
 type noopIndexListener struct{}
 
@@ -168,3 +176,122 @@ func TestIsAbsent_AfterDelete(t *testing.T) {
 
        assert.True(t, sr.IsAbsent(schema.KindMeasure, "g", "m"), "resource 
must be absent after a valid delete")
 }
+
+// fakeDB satisfies pkg/schema.DB for unit tests that only exercise group 
caching.
+type fakeDB struct{}
+
+func (fakeDB) Close() error                           { return nil }
+func (fakeDB) UpdateOptions(_ *commonv1.ResourceOpts) {}
+func (fakeDB) Drop() error                            { return nil }
+
+// fakeResourceSupplier is a programmable ResourceSupplier whose OpenDB returns
+// a configurable error or DB. openCalls counts invocations so retry tests can
+// assert that initBySchema actually re-attempted OpenDB.
+type fakeResourceSupplier struct {
+       openErr   error
+       openCalls int
+}
+
+func (s *fakeResourceSupplier) ResourceSchema(_ *commonv1.Metadata) 
(ResourceSchema, error) {
+       return nil, nil
+}
+
+func (s *fakeResourceSupplier) OpenResource(_ Resource) (IndexListener, error) 
{
+       return noopIndexListener{}, nil
+}
+
+func (s *fakeResourceSupplier) OpenDB(_ *commonv1.Group) (DB, error) {
+       s.openCalls++
+       if s.openErr != nil {
+               return nil, s.openErr
+       }
+       return fakeDB{}, nil
+}
+
+func buildGroup(name string) *commonv1.Group {
+       return &commonv1.Group{
+               Metadata: &commonv1.Metadata{Name: name},
+       }
+}
+
+// TestInitBySchema_FailedOpenDB_LeavesGroupUninit asserts that when OpenDB
+// fails, the group is left in an un-initialized state so a later retry can
+// reattempt OpenDB. Previously the schema was stored before OpenDB ran, which
+// caused isInit() to return true even though db was nil.
+func TestInitBySchema_FailedOpenDB_LeavesGroupUninit(t *testing.T) {
+       openErr := errors.New("boom")
+       supplier := &fakeResourceSupplier{openErr: openErr}
+       g := newGroup(nil, nil, supplier)
+
+       err := g.initBySchema(buildGroup("g1"))
+       require.Error(t, err)
+       assert.True(t, errors.Is(err, openErr))
+       assert.False(t, g.isInit(), "group must not be initialized after a 
failed OpenDB")
+       assert.Nil(t, g.GetSchema(), "groupSchema must not be stored after a 
failed OpenDB")
+       assert.Nil(t, g.SupplyTSDB(), "SupplyTSDB must be nil after a failed 
OpenDB")
+}
+
+// TestInitBySchema_PortableGroupOK asserts that a portable group (nil
+// resourceSupplier) stores the schema and is considered initialized without
+// invoking OpenDB.
+func TestInitBySchema_PortableGroupOK(t *testing.T) {
+       g := newGroup(nil, nil, nil)
+       require.NoError(t, g.initBySchema(buildGroup("portable")))
+       assert.True(t, g.isInit(), "portable group must be initialized after 
initBySchema")
+       assert.Nil(t, g.SupplyTSDB(), "portable group must have no underlying 
tsdb")
+}
+
+// TestInitBySchema_SuccessStoresSchema asserts the success path stores schema
+// after OpenDB returns.
+func TestInitBySchema_SuccessStoresSchema(t *testing.T) {
+       supplier := &fakeResourceSupplier{}
+       g := newGroup(nil, nil, supplier)
+       require.NoError(t, g.initBySchema(buildGroup("g1")))
+       assert.True(t, g.isInit())
+       assert.Equal(t, 1, supplier.openCalls)
+}
+
+// TestInitGroup_RetryAfterFailedOpenDB asserts that initGroup retries OpenDB 
on
+// the second pass when the first attempt left the group un-initialized — the
+// fix for the silent-fail bug where a half-broken group survived in the cache.
+func TestInitGroup_RetryAfterFailedOpenDB(t *testing.T) {
+       openErr := errors.New("transient")
+       supplier := &fakeResourceSupplier{openErr: openErr}
+       repo := &schemaRepo{
+               l:                testLogger(),
+               resourceSupplier: supplier,
+       }
+
+       gs := buildGroup("g1")
+       _, firstErr := repo.initGroup(gs)
+       require.Error(t, firstErr)
+       require.Equal(t, 1, supplier.openCalls)
+
+       cached, ok := repo.getGroup("g1")
+       require.True(t, ok)
+       require.False(t, cached.isInit())
+
+       supplier.openErr = nil
+       g, secondErr := repo.initGroup(gs)
+       require.NoError(t, secondErr)
+       require.NotNil(t, g)
+       assert.True(t, g.isInit())
+       assert.Equal(t, 2, supplier.openCalls, "OpenDB must be re-attempted 
when the cached group is not initialized")
+}
+
+// TestInitGroup_AlreadyInit_NoReopen asserts that an already-initialized group
+// is returned without reinvoking OpenDB.
+func TestInitGroup_AlreadyInit_NoReopen(t *testing.T) {
+       supplier := &fakeResourceSupplier{}
+       repo := &schemaRepo{
+               l:                testLogger(),
+               resourceSupplier: supplier,
+       }
+       gs := buildGroup("g1")
+       _, err := repo.initGroup(gs)
+       require.NoError(t, err)
+       require.Equal(t, 1, supplier.openCalls)
+       _, err = repo.initGroup(gs)
+       require.NoError(t, err)
+       assert.Equal(t, 1, supplier.openCalls, "already-initialized group must 
not re-invoke OpenDB")
+}
diff --git a/pkg/schema/cache_watcher_fatal_test.go 
b/pkg/schema/cache_watcher_fatal_test.go
new file mode 100644
index 000000000..3c66c5d28
--- /dev/null
+++ b/pkg/schema/cache_watcher_fatal_test.go
@@ -0,0 +1,256 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+       stderrors "errors"
+       "fmt"
+       "os"
+       "os/exec"
+       "strings"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/initerror"
+       pkglogger "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// nopCounter is a meter.Counter that discards all increments. Used by the
+// fatal-fail subprocess tests so the worker's deferred Inc(1) does not
+// nil-panic in the child process.
+type nopCounter struct{}
+
+func (nopCounter) Inc(_ float64, _ ...string) {}
+func (nopCounter) Delete(_ ...string) bool    { return false }
+
+var _ meter.Counter = nopCounter{}
+
+const (
+       // fatalChildEnv signals to the test binary that it is the in-process
+       // child for a subprocess fast-fail assertion. Parents re-exec the
+       // running test binary with this env set; only the matching child
+       // branch executes — the parent does not run the child code path.
+       fatalChildEnv          = "BANYAND_TEST_F5_FATAL_CHILD"
+       fatalChildModeWatcher  = "watcher"
+       fatalChildModeSendSync = "send-sync"
+)
+
+// permanentPanicSupplier is a ResourceSchemaSupplier whose OpenResource panics
+// with an error-typed permanent error. It exercises the Watcher worker's
+// recover() classifier (Step 1) — the panic propagates up through processEvent
+// and is caught in the worker's deferred recover(), where IsPermanent 
classifies
+// it as fatal.
+type permanentPanicSupplier struct{}
+
+func (permanentPanicSupplier) ResourceSchema(_ *commonv1.Metadata) 
(ResourceSchema, error) {
+       return nil, nil
+}
+
+func (permanentPanicSupplier) OpenResource(_ Resource) (IndexListener, error) {
+       panic(initerror.AsPermanent(stderrors.New("incompatible version 
1.3.0")))
+}
+
+// permanentErrorSupplier is a ResourceSchemaSupplier whose OpenResource 
returns
+// a permanent error. It exercises the synchronous-return classifier in
+// SendMetadataEvent (Step 2b).
+type permanentErrorSupplier struct{}
+
+func (permanentErrorSupplier) ResourceSchema(_ *commonv1.Metadata) 
(ResourceSchema, error) {
+       return nil, nil
+}
+
+func (permanentErrorSupplier) OpenResource(_ Resource) (IndexListener, error) {
+       return nil, initerror.AsPermanent(stderrors.New("incompatible version 
1.3.0"))
+}
+
+// transientThenOKSupplier returns a non-permanent error on the first call and
+// nil on the second. It is used to verify the transient-retry path stays
+// functional after F5 (Test 5.6).
+type transientThenOKSupplier struct {
+       openCalls atomic.Int32
+}
+
+func (s *transientThenOKSupplier) ResourceSchema(_ *commonv1.Metadata) 
(ResourceSchema, error) {
+       return nil, nil
+}
+
+func (s *transientThenOKSupplier) OpenResource(_ Resource) (IndexListener, 
error) {
+       calls := s.openCalls.Add(1)
+       if calls == 1 {
+               return nil, stderrors.New("etcd hiccup")
+       }
+       return noopIndexListener{}, nil
+}
+
+// fatalChildLogger returns a logger whose .Fatal() output goes to stderr so
+// the parent test can assert on the exit message via CombinedOutput.
+func fatalChildLogger() *pkglogger.Logger {
+       _ = pkglogger.Init(pkglogger.Logging{Env: "dev", Level: "info"})
+       return pkglogger.GetLogger("test", "schema-cache-fatal")
+}
+
+// fatalChildMetrics returns a Metrics instance backed by no-op counters so
+// the worker's deferred Inc(1) call does not nil-panic in the child process.
+func fatalChildMetrics() *Metrics {
+       return &Metrics{
+               totalErrs:    nopCounter{},
+               totalRetries: nopCounter{},
+               totalPanics:  nopCounter{},
+       }
+}
+
+func runFatalChild(t *testing.T, mode string) string {
+       t.Helper()
+       // #nosec G204 — os.Args[0] is the test binary path, set by the Go test
+       // runner. The mode argument is one of two compile-time string constants
+       // declared above.
+       cmd := exec.Command(os.Args[0], "-test.run=^"+t.Name()+"$", "-test.v")
+       cmd.Env = append(os.Environ(), fatalChildEnv+"="+mode)
+       out, err := cmd.CombinedOutput()
+       require.Error(t, err, "child must exit non-zero; output:\n%s", 
string(out))
+       var exitErr *exec.ExitError
+       require.ErrorAs(t, err, &exitErr, "child must produce *exec.ExitError; 
output:\n%s", string(out))
+       require.NotZero(t, exitErr.ExitCode(), "child must exit with non-zero 
code; output:\n%s", string(out))
+       return string(out)
+}
+
+// TestWatcherHitPermanentError_FailsFast verifies that the Watcher worker's
+// recover() classifier escalates an error-typed permanent panic to .Fatal(),
+// which exits the process non-zero (Step 1).
+func TestWatcherHitPermanentError_FailsFast(t *testing.T) {
+       if os.Getenv(fatalChildEnv) == fatalChildModeWatcher {
+               runWatcherFatalChild()
+               return
+       }
+       out := runFatalChild(t, fatalChildModeWatcher)
+       if !strings.Contains(out, "Watcher hit a permanent error") {
+               t.Fatalf("expected stderr to mention 'Watcher hit a permanent 
error', got:\n%s", out)
+       }
+}
+
+// runWatcherFatalChild constructs a minimal schemaRepo whose OpenResource
+// panics with a permanent error, starts the Watcher, and pushes an event
+// through eventCh. The Watcher worker's recover() classifier should escalate
+// to .Fatal() within a few hundred milliseconds.
+func runWatcherFatalChild() {
+       repo := &schemaRepo{
+               l:                      fatalChildLogger(),
+               resourceSchemaSupplier: permanentPanicSupplier{},
+               eventCh:                make(chan MetadataEvent, 1),
+               workerNum:              1,
+               closer:                 run.NewChannelCloser(),
+               metrics:                fatalChildMetrics(),
+       }
+       repo.Watcher()
+       repo.eventCh <- MetadataEvent{
+               Typ:  EventAddOrUpdate,
+               Kind: EventKindResource,
+               Metadata: &databasev1.Measure{
+                       Metadata: &commonv1.Metadata{Group: "g", Name: "m", 
ModRevision: 1},
+               },
+       }
+       // Hold the goroutine alive long enough for the worker's defer to run.
+       // .Fatal() calls os.Exit(1), so this sleep should never complete.
+       time.Sleep(5 * time.Second)
+       fmt.Fprintln(os.Stderr, "child did not exit; classifier did not fire")
+       os.Exit(2)
+}
+
+// TestSendMetadataEventHitPermanentError_FailsFast verifies that the
+// synchronous SendMetadataEvent path escalates a permanent error returned
+// from processEvent to .Fatal() (Step 2b).
+func TestSendMetadataEventHitPermanentError_FailsFast(t *testing.T) {
+       if os.Getenv(fatalChildEnv) == fatalChildModeSendSync {
+               runSendSyncFatalChild()
+               return
+       }
+       out := runFatalChild(t, fatalChildModeSendSync)
+       if !strings.Contains(out, "SendMetadataEvent hit a permanent error") {
+               t.Fatalf("expected stderr to mention 'SendMetadataEvent hit a 
permanent error', got:\n%s", out)
+       }
+}
+
+// runSendSyncFatalChild calls SendMetadataEvent on a schemaRepo whose
+// OpenResource returns a permanent error. The synchronous classifier branch
+// at cache.go (Step 2b) should escalate to .Fatal() before SendMetadataEvent
+// returns.
+func runSendSyncFatalChild() {
+       repo := &schemaRepo{
+               l:                      fatalChildLogger(),
+               resourceSchemaSupplier: permanentErrorSupplier{},
+               eventCh:                make(chan MetadataEvent, 1),
+               workerNum:              1,
+               closer:                 run.NewChannelCloser(),
+               metrics:                fatalChildMetrics(),
+       }
+       repo.SendMetadataEvent(MetadataEvent{
+               Typ:  EventAddOrUpdate,
+               Kind: EventKindResource,
+               Metadata: &databasev1.Measure{
+                       Metadata: &commonv1.Metadata{Group: "g", Name: "m", 
ModRevision: 1},
+               },
+       })
+       fmt.Fprintln(os.Stderr, "child did not exit; classifier did not fire")
+       os.Exit(2)
+}
+
+// TestSendMetadataEvent_TransientErrorRequeues verifies that a non-permanent
+// error from processEvent is requeued via eventCh (the existing transient
+// path) and not escalated to .Fatal() (Test 5.6 — preserves transient retry).
+func TestSendMetadataEvent_TransientErrorRequeues(t *testing.T) {
+       supplier := &transientThenOKSupplier{}
+       repo := &schemaRepo{
+               l:                      testLogger(),
+               resourceSchemaSupplier: supplier,
+               eventCh:                make(chan MetadataEvent, 1),
+               workerNum:              1,
+               closer:                 run.NewChannelCloser(),
+               metrics:                fatalChildMetrics(),
+       }
+
+       evt := MetadataEvent{
+               Typ:  EventAddOrUpdate,
+               Kind: EventKindResource,
+               Metadata: &databasev1.Measure{
+                       Metadata: &commonv1.Metadata{Group: "g", Name: "m", 
ModRevision: 1},
+               },
+       }
+       repo.SendMetadataEvent(evt)
+
+       // First processEvent failed with a transient error; SendMetadataEvent 
should
+       // have pushed onto eventCh for the worker (or self) to retry.
+       select {
+       case requeued := <-repo.eventCh:
+               require.Equal(t, EventKindResource, requeued.Kind, "requeued 
event must be the original")
+       case <-time.After(2 * time.Second):
+               t.Fatalf("transient error must requeue the event onto eventCh; 
openCalls=%d", supplier.openCalls.Load())
+       }
+       require.EqualValues(t, 1, supplier.openCalls.Load(), "first 
SendMetadataEvent must call OpenResource exactly once")
+
+       // Second pass: drive the requeued event through processEvent directly 
to
+       // confirm OpenResource is reattempted and succeeds.
+       require.NoError(t, repo.processEvent(evt))
+       require.EqualValues(t, 2, supplier.openCalls.Load(), "second pass must 
reattempt OpenResource")
+}
diff --git a/pkg/schema/init.go b/pkg/schema/init.go
index cf66ad0c6..8ced2c053 100644
--- a/pkg/schema/init.go
+++ b/pkg/schema/init.go
@@ -101,9 +101,9 @@ func (sr *schemaRepo) getCatalog(kind schema.Kind) 
commonv1.Catalog {
 }
 
 func (sr *schemaRepo) processGroup(ctx context.Context, g *commonv1.Group, 
catalog commonv1.Catalog) {
-       _, err := sr.initGroup(g)
-       if err != nil {
-               logger.Panicf("fails to init the group: %v", err)
+       if _, initErr := sr.initGroup(g); initErr != nil {
+               sr.l.Error().Err(initErr).Str("group", 
g.Metadata.GetName()).Msg("fails to init the group")
+               panic(fmt.Errorf("fails to init the group %s: %w", 
g.Metadata.GetName(), initErr))
        }
        sr.processRules(ctx, g.Metadata.GetName())
        sr.processBindings(ctx, g.Metadata.GetName())
@@ -223,6 +223,12 @@ func (sr *schemaRepo) initGroup(groupSchema 
*commonv1.Group) (*group, error) {
        defer sr.groupMux.Unlock()
        g, ok := sr.getGroup(groupSchema.Metadata.Name)
        if ok {
+               if g.isInit() {
+                       return g, nil
+               }
+               if reinitErr := g.initBySchema(groupSchema); reinitErr != nil {
+                       return nil, reinitErr
+               }
                return g, nil
        }
        sr.l.Info().Str("group", groupSchema.Metadata.Name).Msg("creating a 
tsdb")

Reply via email to