This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 5c401217f fix: fail fast on incompatible storage version (boot +
runtime paths) (#1124)
5c401217f is described below
commit 5c401217fdb65facbe2ce86ff8a79318c1aa075e
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon May 11 09:22:04 2026 +0800
fix: fail fast on incompatible storage version (boot + runtime paths)
(#1124)
---
CHANGES.md | 1 +
Makefile | 25 +-
banyand/internal/storage/tsdb.go | 21 +-
banyand/internal/storage/tsdb_test.go | 100 ++++++++
banyand/internal/storage/version.go | 5 +-
banyand/internal/storage/version_test.go | 29 +++
banyand/metadata/schema/property/client.go | 34 ++-
.../metadata/schema/property/init_handler_test.go | 209 +++++++++++++++++
pkg/initerror/initerror.go | 61 +++++
pkg/initerror/initerror_test.go | 71 ++++++
pkg/schema/cache.go | 61 ++++-
pkg/schema/cache_test.go | 127 ++++++++++
pkg/schema/cache_watcher_fatal_test.go | 256 +++++++++++++++++++++
pkg/schema/init.go | 12 +-
14 files changed, 994 insertions(+), 18 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 546e8e1fb..c7dad3188 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -88,6 +88,7 @@ Release Notes.
- Add `GroupLifecycleInfo.errors` to surface per-group collection failures
from FODC `InspectAll` instead of silently dropping the affected node entry.
- Fix `CollectDataInfo` and `CollectLiaisonInfo` not handling
`CATALOG_PROPERTY` groups.
- Fix lifecycle migration where the receiving node could create segments
shorter than the configured `SegmentInterval`.
+- Fail fast on incompatible storage version at boot. Previously the server
would start in a degraded `SERVING` state with affected groups un-loaded
because the property schema-registry retry loop swallowed the
version-incompatibility panic. Compatible versions are listed in
`banyand/internal/storage/versions.yml`.
### Chores
diff --git a/Makefile b/Makefile
index aadd5a98c..950d73f82 100644
--- a/Makefile
+++ b/Makefile
@@ -105,8 +105,31 @@ load-test-barrier: ## Run the schema-barrier CP-6 SLO load
harness (3 data nodes
lint: TARGET=lint
lint: PROJECTS:=api $(PROJECTS) pkg scripts/ci/check test
+lint: check-import-boundaries
lint: default ## Run the linters on all projects
+# check-import-boundaries enforces the layering invariants documented in
+# pkg/initerror/initerror.go: the leaf permanent-error contract must not gain
+# project-internal dependencies, and the property schema-registry classifier
+# must not import the storage internals it classifies via the leaf interface.
+.PHONY: check-import-boundaries
+check-import-boundaries: ## Enforce import-boundary invariants for the
version-incompat fix
+ @bad=0; \
+ if grep -rln 'banyand/internal/storage'
banyand/metadata/schema/property/ 2>/dev/null; then \
+ echo "FAIL: banyand/metadata/schema/property/ must not import
banyand/internal/storage"; \
+ bad=1; \
+ fi; \
+ if grep -rln 'banyand/internal/storage' pkg/schema/ 2>/dev/null; then \
+ echo "FAIL: pkg/schema/ must not import
banyand/internal/storage"; \
+ bad=1; \
+ fi; \
+ if grep -rln 'github.com/apache/skywalking-banyandb/'
pkg/initerror/*.go 2>/dev/null | grep -v _test.go; then \
+ echo "FAIL: pkg/initerror/ must remain a leaf with zero
project-internal imports (test files excepted)"; \
+ bad=1; \
+ fi; \
+ if [ $$bad -ne 0 ]; then exit 1; fi; \
+ echo "import boundaries OK"
+
##@ Vendor update
vendor-update: TARGET=vendor-update
@@ -231,7 +254,7 @@ release-push-candidate: ## Push release candidate
${PUSH_RELEASE_SCRIPTS}
.PHONY: all $(PROJECTS) clean build default nuke
-.PHONY: lint check tidy format pre-push generate-test-cases
+.PHONY: lint check tidy format pre-push generate-test-cases
check-import-boundaries
.PHONY: test test-race test-coverage test-ci test-docker
.PHONY: license-check license-fix license-dep
.PHONY: release release-binary release-source release-sign release-assembly
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 7e7993085..d60d7cf00 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -19,6 +19,7 @@ package storage
import (
"context"
+ "fmt"
"path/filepath"
"strings"
"sync"
@@ -173,7 +174,7 @@ func (d *database[T, O]) Close() error {
d.segmentController.close()
d.lock.Close()
if err := d.lfs.DeleteFile(d.lock.Path()); err != nil {
- logger.Panicf("cannot delete lock file %s: %s", d.lock.Path(),
err)
+ panic(fmt.Errorf("cannot delete lock file %s: %w",
d.lock.Path(), err))
}
if d.metricsFactory != nil {
d.metricsFactory.Close()
@@ -239,14 +240,30 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts
TSDBOpts[T, O], cache
lockPath := filepath.Join(opts.Location, lockFilename)
lock, err := tsdbLfs.CreateLockFile(lockPath, FilePerm)
if err != nil {
- logger.Panicf("cannot create lock file %s: %s", lockPath, err)
+ panic(fmt.Errorf("cannot create lock file %s: %w", lockPath,
err))
}
db.lock = lock
+ // Release the lock fd if any subsequent step in OpenTSDB returns an
error.
+ // Otherwise the file descriptor leaks until process exit and a retry
on the
+ // same data dir hits "resource temporarily unavailable" from the next
+ // CreateLockFile call. Ownership transfers to (*database).Close() only
on
+ // success — released = true is set on the success return below.
+ released := false
+ defer func() {
+ if !released {
+ _ = lock.Close()
+ _ = tsdbLfs.DeleteFile(lockPath)
+ }
+ }()
if err := db.segmentController.open(); err != nil {
return nil, err
}
obsservice.MetricsCollector.Register(location, db.collect)
db.disableRotation = opts.DisableRotation
+ // db (and its lock fd) is now owned by the caller; on rotation-task
error
+ // the partially-initialized db is still returned so the caller can
Close
+ // it and unregister the metrics collector.
+ released = true
return db, db.startRotationTask()
}
diff --git a/banyand/internal/storage/tsdb_test.go
b/banyand/internal/storage/tsdb_test.go
index 6b786a9c5..4852b9bd6 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -19,6 +19,8 @@ package storage
import (
"context"
+ "errors"
+ "os"
"path/filepath"
"testing"
"time"
@@ -28,6 +30,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/initerror"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/test"
@@ -670,3 +673,100 @@ func TestCollectWithPartialClosedSegments(t *testing.T) {
// Clean up
require.NoError(t, tsdb.Close())
}
+
+// newTestSegmentSkeleton seeds a minimal on-disk segment layout under dir with
+// the supplied storage version stamp. The layout matches readSegmentMeta's
+// "old format" (version-only file body), which is sufficient to exercise
+// checkVersion at TSDB open time.
+func newTestSegmentSkeleton(t *testing.T, dir, version string) {
+ t.Helper()
+ segDir := filepath.Join(dir, "seg-20240501")
+ require.NoError(t, os.MkdirAll(segDir, 0o755))
+ metaPath := filepath.Join(segDir, metadataFilename)
+ require.NoError(t, os.WriteFile(metaPath, []byte(version), 0o600))
+}
+
+// TestTSDBOpen_LockReleasedAfterFailedOpen reproduces the lock-file leak that
+// the F5 v2 lock-fix targets. Prior to the fix, OpenTSDB acquired the lock
+// file but never released it on the segmentController.open() error path, so
+// a second OpenTSDB call against the same directory failed with
+// "resource temporarily unavailable" from CreateLockFile. The fix adds a
+// defer-release with a `released` boolean flipped to true only on the
+// success path. This test seeds an incompatible segment to force the first
+// OpenTSDB to fail, then removes the segment and asserts the second
+// OpenTSDB succeeds — which is only possible if the lock fd was released.
+func TestTSDBOpen_LockReleasedAfterFailedOpen(t *testing.T) {
+ logger.Init(logger.Logging{Env: "dev", Level: flags.LogLevel})
+
+ dir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ newTestSegmentSkeleton(t, dir, "1.3.0")
+
+ opts := TSDBOpts[*MockTSTable, any]{
+ Location: dir,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 3},
+ ShardNum: 1,
+ TSTableCreator: MockTSTableCreator,
+ }
+
+ ctx := context.Background()
+ mc := timestamp.NewMockClock()
+ ts, parseErr := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01
00:00:00", time.Local)
+ require.NoError(t, parseErr)
+ mc.Set(ts)
+ ctx = timestamp.SetClock(ctx, mc)
+
+ // First open fails on the incompatible segment. With the leak fix the
+ // defer in OpenTSDB releases the lock fd before returning the error.
+ serviceCache := NewServiceCache()
+ _, openErr := OpenTSDB(ctx, opts, serviceCache, group)
+ require.Error(t, openErr, "first OpenTSDB must fail on the incompatible
segment")
+ require.True(t, initerror.IsPermanent(openErr), "first OpenTSDB error
must be permanent")
+
+ // Remove the bad segment so the second open's segmentController scan
succeeds.
+ require.NoError(t, os.RemoveAll(filepath.Join(dir, "seg-20240501")))
+
+ // Second open against the same dir must succeed — only possible if the
+ // lock fd from the first attempt was released. Without the fix this
+ // returns "resource temporarily unavailable" from CreateLockFile.
+ tsdb, reopenErr := OpenTSDB(ctx, opts, serviceCache, group)
+ require.NoError(t, reopenErr, "second OpenTSDB must succeed after the
failed first attempt")
+ require.NotNil(t, tsdb)
+ require.NoError(t, tsdb.Close())
+}
+
+// TestTSDBOpen_RejectsIncompatibleSegment verifies that opening a TSDB whose
+// on-disk segment is stamped with an incompatible version surfaces a permanent
+// initialization error so the caller fails fast instead of silently dropping
+// the affected group.
+func TestTSDBOpen_RejectsIncompatibleSegment(t *testing.T) {
+ logger.Init(logger.Logging{Env: "dev", Level: flags.LogLevel})
+
+ dir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ newTestSegmentSkeleton(t, dir, "1.3.0")
+
+ opts := TSDBOpts[*MockTSTable, any]{
+ Location: dir,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 3},
+ ShardNum: 1,
+ TSTableCreator: MockTSTableCreator,
+ }
+
+ ctx := context.Background()
+ mc := timestamp.NewMockClock()
+ ts, parseErr := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01
00:00:00", time.Local)
+ require.NoError(t, parseErr)
+ mc.Set(ts)
+ ctx = timestamp.SetClock(ctx, mc)
+
+ serviceCache := NewServiceCache()
+ _, openErr := OpenTSDB(ctx, opts, serviceCache, group)
+ require.Error(t, openErr, "OpenTSDB must reject an incompatible-version
segment")
+ require.True(t, initerror.IsPermanent(openErr), "incompatible-version
error must surface as permanent")
+ require.True(t, errors.Is(openErr, errVersionIncompatible), "error must
still match the version-incompatible sentinel")
+}
diff --git a/banyand/internal/storage/version.go
b/banyand/internal/storage/version.go
index ddea79a33..82d1adaaf 100644
--- a/banyand/internal/storage/version.go
+++ b/banyand/internal/storage/version.go
@@ -24,6 +24,8 @@ import (
"github.com/pkg/errors"
"sigs.k8s.io/yaml"
+
+ "github.com/apache/skywalking-banyandb/pkg/initerror"
)
const (
@@ -46,7 +48,8 @@ func checkVersion(version string) error {
return nil
}
}
- return errors.WithMessagef(errVersionIncompatible, "incompatible
version %s, supported versions: %s", version, strings.Join(compatibleVersions,
", "))
+ return initerror.AsPermanent(errors.WithMessagef(errVersionIncompatible,
+ "incompatible version %s, supported versions: %s", version,
strings.Join(compatibleVersions, ", ")))
}
type segmentMeta struct {
diff --git a/banyand/internal/storage/version_test.go
b/banyand/internal/storage/version_test.go
index 726e6d8a2..31350d56d 100644
--- a/banyand/internal/storage/version_test.go
+++ b/banyand/internal/storage/version_test.go
@@ -18,10 +18,14 @@
package storage
import (
+ "errors"
+ "fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/pkg/initerror"
)
func TestReadSegmentMeta_NewFormat(t *testing.T) {
@@ -52,6 +56,31 @@ func TestReadSegmentMeta_IncompatibleVersion(t *testing.T) {
data :=
[]byte(`{"version":"0.1.0","endTime":"2026-04-07T00:00:00+08:00"}`)
_, err := readSegmentMeta(data)
assert.Error(t, err)
+ assert.True(t, initerror.IsPermanent(err), "incompatible version must
surface as permanent")
+ assert.True(t, errors.Is(err, errVersionIncompatible), "incompatible
version must still match the sentinel")
+}
+
+func TestCheckVersion_WrappedAsPermanent(t *testing.T) {
+ err := checkVersion("1.3.0")
+ require.Error(t, err)
+ assert.True(t, initerror.IsPermanent(err))
+ assert.True(t, errors.Is(err, errVersionIncompatible))
+
+ wrapped := fmt.Errorf("init: %w", err)
+ assert.True(t, initerror.IsPermanent(wrapped), "permanence must survive
fmt.Errorf wrapping")
+ assert.True(t, errors.Is(wrapped, errVersionIncompatible))
+}
+
+func TestCheckVersion_CompatibleReturnsNil(t *testing.T) {
+ require.NoError(t, checkVersion(currentVersion))
+ for _, v := range compatibleVersions {
+ require.NoError(t, checkVersion(v))
+ }
+}
+
+func TestIsPermanent_UnrelatedErrorIsNotPermanent(t *testing.T) {
+ assert.False(t, initerror.IsPermanent(errors.New("plain")))
+ assert.False(t, initerror.IsPermanent(nil))
}
func TestReadSegmentMeta_NewFormatNoEndTime(t *testing.T) {
diff --git a/banyand/metadata/schema/property/client.go
b/banyand/metadata/schema/property/client.go
index 88ec7360d..69c2d8fe1 100644
--- a/banyand/metadata/schema/property/client.go
+++ b/banyand/metadata/schema/property/client.go
@@ -41,6 +41,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/initerror"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -128,6 +129,7 @@ type SchemaRegistry struct {
l *logger.Logger
cache *schemaCache
caCertReloader *pkgtls.Reloader
+ clock timestamp.Clock
handlers map[schema.Kind][]schema.EventHandler
watchSessions map[string]*watchSession
pauseQueue []func()
@@ -205,6 +207,7 @@ func NewSchemaRegistryClient(cfg *ClientConfig)
(*SchemaRegistry, error) {
l: l,
cache: newSchemaCacheWithLimit(maxTombstones),
caCertReloader: caCertReloader,
+ clock: timestamp.NewClock(),
handlers: make(map[schema.Kind][]schema.EventHandler),
watchSessions: make(map[string]*watchSession),
syncInterval: syncInterval,
@@ -1116,31 +1119,48 @@ func (r *SchemaRegistry) RegisterHandler(name string,
kind schema.Kind, handler
}
func (r *SchemaRegistry) initHandlerWithRetry(name string, handler
schema.EventHandler, kinds []schema.Kind) {
- deadline := time.Now().Add(time.Minute)
+ deadline := r.clock.Now().Add(time.Minute)
var lastErr interface{}
for {
- if tryCallOnInit(handler, kinds, &lastErr) {
+ ok, permanent := tryCallOnInit(handler, kinds, &lastErr)
+ if ok {
return
}
- if time.Now().After(deadline) {
- r.l.Panic().Str("name", name).Interface("error",
lastErr).
+ if permanent {
+ r.l.Error().Str("name", name).Str("error",
fmt.Sprintf("%+v", lastErr)).
+ Msg("OnInit hit a permanent error, refusing to
start")
+ if recErr, isErr := lastErr.(error); isErr {
+ panic(fmt.Errorf("OnInit %s: %w", name, recErr))
+ }
+ panic(fmt.Errorf("OnInit %s: %v", name, lastErr))
+ }
+ if r.clock.Now().After(deadline) {
+ r.l.Error().Str("name", name).Interface("error",
lastErr).
Msg("handler OnInit failed after 1m, giving up")
+ panic(fmt.Errorf("OnInit %s exceeded deadline: %v",
name, lastErr))
}
r.l.Warn().Str("name", name).Interface("error", lastErr).
Msg("handler OnInit panicked due to transient error,
retrying in 1s")
- time.Sleep(time.Second)
+ r.clock.Sleep(time.Second)
}
}
-func tryCallOnInit(handler schema.EventHandler, kinds []schema.Kind, lastErr
*interface{}) (ok bool) {
+// tryCallOnInit invokes handler.OnInit and recovers any panic. Do not wrap the
+// recovered value with multierr before classification; multierr.Append's
+// Unwrap() []error is traversed by errors.As, but extra wrapping changes which
+// concrete type is returned first and breaks IsPermanent's interface lookup.
+func tryCallOnInit(handler schema.EventHandler, kinds []schema.Kind, lastErr
*interface{}) (ok, permanent bool) {
defer func() {
if rec := recover(); rec != nil {
*lastErr = rec
ok = false
+ if recErr, isErr := rec.(error); isErr {
+ permanent = initerror.IsPermanent(recErr)
+ }
}
}()
handler.OnInit(kinds)
- return true
+ return true, false
}
// Start starts a single goroutine that periodically syncs schemas.
diff --git a/banyand/metadata/schema/property/init_handler_test.go
b/banyand/metadata/schema/property/init_handler_test.go
new file mode 100644
index 000000000..4e6625892
--- /dev/null
+++ b/banyand/metadata/schema/property/init_handler_test.go
@@ -0,0 +1,209 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+ "errors"
+ "fmt"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/initerror"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// retryTestHandler implements schema.EventHandler for retry-classification
tests.
+// Each OnInit call invokes the configured panicFn (which may panic). When
+// noPanicAfter calls have been made, OnInit returns normally so the retry loop
+// can succeed.
+type retryTestHandler struct {
+ panicFn func(callIdx int)
+ startedSignal chan struct{}
+ schema.UnimplementedOnInitHandler
+ calls atomic.Int32
+ noPanicAfter int32
+}
+
+func (h *retryTestHandler) OnAddOrUpdate(_ schema.Metadata) {}
+func (h *retryTestHandler) OnDelete(_ schema.Metadata) {}
+
+func (h *retryTestHandler) OnInit(_ []schema.Kind) (bool, []int64) {
+ idx := h.calls.Add(1)
+ if h.startedSignal != nil {
+ select {
+ case h.startedSignal <- struct{}{}:
+ default:
+ }
+ }
+ if h.noPanicAfter > 0 && idx > h.noPanicAfter {
+ return true, nil
+ }
+ if h.panicFn != nil {
+ h.panicFn(int(idx))
+ }
+ return true, nil
+}
+
+// newTestRegistryWithClock builds a SchemaRegistry instance that bypasses
+// connection setup so we can drive its retry classifier in-process. It uses a
+// mock clock so the test controls the deadline and Sleep windows.
+func newTestRegistryWithClock(_ *testing.T, mc timestamp.MockClock)
*SchemaRegistry {
+ l := logger.GetLogger("init-handler-retry-test")
+ return &SchemaRegistry{
+ l: l,
+ clock: mc,
+ }
+}
+
+// TestInitHandlerWithRetry_PermanentVersionError_FailsFast asserts that a
+// handler panicking with a Permanent-wrapped error causes the retry loop to
+// exit on the first attempt via r.l.Panic, without ever advancing the clock.
+func TestInitHandlerWithRetry_PermanentVersionError_FailsFast(t *testing.T) {
+ require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level:
"warn"}))
+ mc := timestamp.NewMockClock()
+ mcStart := mc.Now()
+ r := newTestRegistryWithClock(t, mc)
+ permErr := initerror.AsPermanent(errors.New("incompatible version
1.3.0"))
+ handler := &retryTestHandler{
+ panicFn: func(_ int) {
+ panic(fmt.Errorf("OpenDB failed: %w", permErr))
+ },
+ }
+
+ done := make(chan struct{})
+ var recovered interface{}
+ go func() {
+ defer close(done)
+ defer func() { recovered = recover() }()
+ r.initHandlerWithRetry("perm-handler", handler,
[]schema.Kind{schema.KindGroup})
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(2 * time.Second):
+ t.Fatal("initHandlerWithRetry did not return within 2s;
permanent error path should fail fast")
+ }
+
+ require.NotNil(t, recovered, "permanent error must propagate as a
panic")
+ recErr, isErr := recovered.(error)
+ require.True(t, isErr, "panic value should be an error wrapping the
permanent error")
+ assert.True(t, initerror.IsPermanent(recErr), "recovered error must
satisfy IsPermanent")
+ assert.Equal(t, int32(1), handler.calls.Load(), "permanent error must
not be retried")
+ assert.Equal(t, mcStart, mc.Now(), "mock clock must not advance when
the first attempt is permanent")
+}
+
+// TestInitHandlerWithRetry_TransientStillRetries asserts that a handler
+// panicking with a non-permanent error keeps retrying until either the
deadline
+// passes (panic) or it stops panicking (success). Here we let the handler stop
+// panicking after two attempts to exercise the success path.
+func TestInitHandlerWithRetry_TransientStillRetries(t *testing.T) {
+ require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level:
"warn"}))
+ mc := timestamp.NewMockClock()
+ r := newTestRegistryWithClock(t, mc)
+ handler := &retryTestHandler{
+ noPanicAfter: 2,
+ startedSignal: make(chan struct{}, 4),
+ panicFn: func(_ int) {
+ panic("etcd unreachable")
+ },
+ }
+
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ r.initHandlerWithRetry("transient-handler", handler,
[]schema.Kind{schema.KindGroup})
+ }()
+
+ // Wait for the handler's first OnInit call to have run.
+ for i := 0; i < 2; i++ {
+ select {
+ case <-handler.startedSignal:
+ case <-time.After(2 * time.Second):
+ t.Fatalf("handler.OnInit was not invoked for attempt %d
within 2s", i+1)
+ }
+ // Advance the mock clock so the worker's Sleep returns and it
can
+ // re-enter the retry loop. The benbjohnson MockClock fires
sleepers
+ // when Add brings the time past their deadline, even if the
sleeper
+ // registered after the Add — but we still poll briefly to
avoid races.
+ require.Eventually(t, func() bool {
+ mc.Add(time.Second)
+ return handler.calls.Load() > int32(i+1) || done != nil
&& isClosed(done)
+ }, 3*time.Second, 10*time.Millisecond, "Sleep must release
after clock advance")
+ }
+
+ // Wait for the loop to finish (third call returns ok=true).
+ select {
+ case <-done:
+ case <-time.After(3 * time.Second):
+ t.Fatal("initHandlerWithRetry did not return after handler
stopped panicking")
+ }
+ assert.GreaterOrEqual(t, handler.calls.Load(), int32(3), "handler must
be retried at least twice before succeeding")
+}
+
+// TestInitHandlerWithRetry_DeadlineExceededPanics asserts that a non-permanent
+// failure that never stops eventually triggers r.l.Panic when the deadline is
+// crossed.
+func TestInitHandlerWithRetry_DeadlineExceededPanics(t *testing.T) {
+ require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level:
"warn"}))
+ mc := timestamp.NewMockClock()
+ r := newTestRegistryWithClock(t, mc)
+ handler := &retryTestHandler{
+ startedSignal: make(chan struct{}, 8),
+ panicFn: func(_ int) {
+ panic("never resolves")
+ },
+ }
+
+ done := make(chan struct{})
+ var recovered interface{}
+ go func() {
+ defer close(done)
+ defer func() { recovered = recover() }()
+ r.initHandlerWithRetry("deadline-handler", handler,
[]schema.Kind{schema.KindGroup})
+ }()
+
+ deadline := time.Now().Add(5 * time.Second)
+ for {
+ select {
+ case <-done:
+ require.NotNil(t, recovered, "deadline exceeded must
propagate as a panic")
+ return
+ case <-handler.startedSignal:
+ case <-time.After(50 * time.Millisecond):
+ }
+ if time.Now().After(deadline) {
+ t.Fatal("initHandlerWithRetry did not exit after
repeated clock advances")
+ }
+ mc.Add(2 * time.Minute)
+ }
+}
+
+func isClosed(ch <-chan struct{}) bool {
+ select {
+ case <-ch:
+ return true
+ default:
+ return false
+ }
+}
diff --git a/pkg/initerror/initerror.go b/pkg/initerror/initerror.go
new file mode 100644
index 000000000..6bff59384
--- /dev/null
+++ b/pkg/initerror/initerror.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package initerror exposes a Permanent error contract for initialization
+// failures that must not be retried. The package is a leaf: it imports nothing
+// project-internal so it can be referenced by both the storage layer (which
+// produces permanent errors) and the metadata-schema retry classifier (which
+// reads them) without introducing a layering edge between those packages.
+package initerror
+
+import "errors"
+
+// Permanent marks an error as a permanent (non-retriable) initialization
failure.
+type Permanent interface {
+ error
+ Permanent() bool
+}
+
+type permanentError struct {
+ err error
+}
+
+// Error implements error.
+func (p *permanentError) Error() string { return p.err.Error() }
+
+// Unwrap exposes the wrapped error so errors.Is/As can traverse the chain.
+func (p *permanentError) Unwrap() error { return p.err }
+
+// Permanent reports that this error must not be retried.
+func (p *permanentError) Permanent() bool { return true }
+
+// AsPermanent wraps err as a Permanent error. Returns nil if err is nil.
+func AsPermanent(err error) error {
+ if err == nil {
+ return nil
+ }
+ return &permanentError{err: err}
+}
+
+// IsPermanent reports whether err's chain contains a Permanent error.
+func IsPermanent(err error) bool {
+ if err == nil {
+ return false
+ }
+ var p Permanent
+ return errors.As(err, &p) && p.Permanent()
+}
diff --git a/pkg/initerror/initerror_test.go b/pkg/initerror/initerror_test.go
new file mode 100644
index 000000000..2c16db32f
--- /dev/null
+++ b/pkg/initerror/initerror_test.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package initerror_test
+
+import (
+ "errors"
+ "fmt"
+ "testing"
+
+ "github.com/apache/skywalking-banyandb/pkg/initerror"
+)
+
+var errSentinel = errors.New("sentinel")
+
+func TestAsPermanent_NilReturnsNil(t *testing.T) {
+ if got := initerror.AsPermanent(nil); got != nil {
+ t.Fatalf("AsPermanent(nil) = %v, want nil", got)
+ }
+}
+
+func TestIsPermanent_Nil(t *testing.T) {
+ if initerror.IsPermanent(nil) {
+ t.Fatalf("IsPermanent(nil) = true, want false")
+ }
+}
+
+func TestIsPermanent_PlainError(t *testing.T) {
+ if initerror.IsPermanent(errSentinel) {
+ t.Fatalf("IsPermanent(plain error) = true, want false")
+ }
+}
+
+func TestIsPermanent_Direct(t *testing.T) {
+ wrapped := initerror.AsPermanent(errSentinel)
+ if !initerror.IsPermanent(wrapped) {
+ t.Fatalf("IsPermanent(AsPermanent(...)) = false, want true")
+ }
+}
+
+func TestIsPermanent_FmtErrorfChain(t *testing.T) {
+ wrapped := fmt.Errorf("init failed: %w",
initerror.AsPermanent(errSentinel))
+ if !initerror.IsPermanent(wrapped) {
+ t.Fatalf("IsPermanent through fmt.Errorf chain = false, want
true")
+ }
+ if !errors.Is(wrapped, errSentinel) {
+ t.Fatalf("errors.Is should still see the inner sentinel")
+ }
+}
+
+func TestIsPermanent_DoubleWrap(t *testing.T) {
+ inner := initerror.AsPermanent(errSentinel)
+ outer := fmt.Errorf("outer: %w", fmt.Errorf("middle: %w", inner))
+ if !initerror.IsPermanent(outer) {
+ t.Fatalf("IsPermanent through double wrap = false, want true")
+ }
+}
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index ec45cda19..95b8f88b1 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -35,6 +35,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/cgroups"
+ "github.com/apache/skywalking-banyandb/pkg/initerror"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -108,7 +109,35 @@ func (sr *schemaRepo) SendMetadataEvent(event
MetadataEvent) {
return
}
defer sr.closer.SenderDone()
+ // SendMetadataEvent runs synchronously on the caller goroutine (e.g. an
+ // etcd-watch handler such as (*schemaRepo).OnAddOrUpdate). A panic from
+ // processEvent — string-typed today, error-typed once Step 1b lands —
+ // would otherwise propagate to a caller that has no recover() of its
own.
+ // The classifier branch escalates permanent errors to .Fatal() so
+ // runtime registration of an incompatible group exits the process the
+ // same way the boot path does; non-permanent panics are logged and
+ // requeued onto eventCh so the Watcher worker can retry, mirroring the
+ // transient error-return path below.
+ defer func() {
+ if recovered := recover(); recovered != nil {
+ if recErr, isErr := recovered.(error); isErr &&
initerror.IsPermanent(recErr) {
+ sr.l.Fatal().Err(recErr).Interface("event",
event).Str("stack", string(debug.Stack())).
+ Msg("SendMetadataEvent hit a permanent
error, refusing to continue")
+ }
+ sr.l.Warn().Interface("err",
recovered).Interface("event", event).Str("stack", string(debug.Stack())).
+ Msg("recovered from SendMetadataEvent panic;
requeueing")
+ sr.metrics.totalErrs.Inc(1)
+ select {
+ case sr.eventCh <- event:
+ case <-sr.closer.CloseNotify():
+ }
+ }
+ }()
if err := sr.processEvent(event); err != nil && !errors.Is(err,
schema.ErrClosed) {
+ if initerror.IsPermanent(err) {
+ sr.l.Fatal().Err(err).Interface("event", event).
+ Msg("SendMetadataEvent hit a permanent error,
refusing to continue")
+ }
select {
case <-sr.closer.CloseNotify():
return
@@ -237,10 +266,25 @@ func (sr *schemaRepo) Watcher() {
}
defer func() {
sr.closer.ReceiverDone()
+ // Bump the panic metric BEFORE the classifier
branch: .Fatal()
+ // calls os.Exit(1) and the deferred Inc would
otherwise be
+ // skipped on the permanent-error escalation
path. Do NOT move
+ // this line under the if-branch.
+ sr.metrics.totalPanics.Inc(1)
if err := recover(); err != nil {
+ // Classifier is dormant on
string-typed panics (today's
+ // Panicf sites outside this PR's scope
still panic with
+ // strings); tsdb.go:176/242 are
converted in Step 1b so
+ // permanent errors reaching this
recover are error-typed.
+ // We use .Fatal() not panic() because
this is a worker
+ // goroutine: panic() would only kill
this goroutine and
+ // leave the Watcher partially alive.
+ if recErr, isErr := err.(error); isErr
&& initerror.IsPermanent(recErr) {
+
sr.l.Fatal().Err(recErr).Str("stack", string(debug.Stack())).
+ Msg("Watcher hit a
permanent error, refusing to continue")
+ }
sr.l.Warn().Interface("err",
err).Str("stack", string(debug.Stack())).Msg("watching the events")
}
- sr.metrics.totalPanics.Inc(1)
}()
// The Watcher drains events retried via the channel
after transient
// processing errors. The fast path runs synchronously
in SendMetadataEvent
@@ -252,6 +296,10 @@ func (sr *schemaRepo) Watcher() {
return
}
if retryErr := sr.processEvent(evt);
retryErr != nil && !errors.Is(retryErr, schema.ErrClosed) {
+ if
initerror.IsPermanent(retryErr) {
+
sr.l.Fatal().Err(retryErr).Interface("event", evt).
+ Msg("Watcher
hit a permanent error, refusing to continue")
+ }
select {
case <-sr.closer.CloseNotify():
return
@@ -636,8 +684,8 @@ func (g *group) init(name string) error {
}
func (g *group) initBySchema(groupSchema *commonv1.Group) error {
- g.groupSchema.Store(groupSchema)
if g.isPortable() {
+ g.groupSchema.Store(groupSchema)
return nil
}
db, err := g.resourceSupplier.OpenDB(groupSchema)
@@ -645,7 +693,8 @@ func (g *group) initBySchema(groupSchema *commonv1.Group)
error {
return err
}
g.db.Store(db)
- return err
+ g.groupSchema.Store(groupSchema)
+ return nil
}
func (g *group) isInit() bool {
@@ -671,7 +720,11 @@ func (g *group) close() (err error) {
if !g.isInit() || g.isPortable() {
return nil
}
- return multierr.Append(err, g.SupplyTSDB().Close())
+ tsdb := g.SupplyTSDB()
+ if tsdb == nil {
+ return nil
+ }
+ return multierr.Append(err, tsdb.Close())
}
func (g *group) drop() error {
diff --git a/pkg/schema/cache_test.go b/pkg/schema/cache_test.go
index d524e238b..ebafce414 100644
--- a/pkg/schema/cache_test.go
+++ b/pkg/schema/cache_test.go
@@ -18,6 +18,7 @@
package schema
import (
+ "errors"
"testing"
"github.com/stretchr/testify/assert"
@@ -26,8 +27,15 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ pkglogger "github.com/apache/skywalking-banyandb/pkg/logger"
)
+// testLogger returns a logger suitable for unit tests in this package.
+func testLogger() *pkglogger.Logger {
+ _ = pkglogger.Init(pkglogger.Logging{Env: "dev", Level: "warn"})
+ return pkglogger.GetLogger("test", "schema-cache")
+}
+
// noopIndexListener satisfies IndexListener and discards all index updates.
type noopIndexListener struct{}
@@ -168,3 +176,122 @@ func TestIsAbsent_AfterDelete(t *testing.T) {
assert.True(t, sr.IsAbsent(schema.KindMeasure, "g", "m"), "resource
must be absent after a valid delete")
}
+
+// fakeDB satisfies pkg/schema.DB for unit tests that only exercise group
caching.
+type fakeDB struct{}
+
+func (fakeDB) Close() error { return nil }
+func (fakeDB) UpdateOptions(_ *commonv1.ResourceOpts) {}
+func (fakeDB) Drop() error { return nil }
+
+// fakeResourceSupplier is a programmable ResourceSupplier whose OpenDB returns
+// a configurable error or DB. openCalls counts invocations so retry tests can
+// assert that initBySchema actually re-attempted OpenDB.
+type fakeResourceSupplier struct {
+ openErr error
+ openCalls int
+}
+
+func (s *fakeResourceSupplier) ResourceSchema(_ *commonv1.Metadata)
(ResourceSchema, error) {
+ return nil, nil
+}
+
+func (s *fakeResourceSupplier) OpenResource(_ Resource) (IndexListener, error)
{
+ return noopIndexListener{}, nil
+}
+
+func (s *fakeResourceSupplier) OpenDB(_ *commonv1.Group) (DB, error) {
+ s.openCalls++
+ if s.openErr != nil {
+ return nil, s.openErr
+ }
+ return fakeDB{}, nil
+}
+
+func buildGroup(name string) *commonv1.Group {
+ return &commonv1.Group{
+ Metadata: &commonv1.Metadata{Name: name},
+ }
+}
+
+// TestInitBySchema_FailedOpenDB_LeavesGroupUninit asserts that when OpenDB
+// fails, the group is left in an un-initialized state so a later retry can
+// reattempt OpenDB. Previously the schema was stored before OpenDB ran, which
+// caused isInit() to return true even though db was nil.
+func TestInitBySchema_FailedOpenDB_LeavesGroupUninit(t *testing.T) {
+ openErr := errors.New("boom")
+ supplier := &fakeResourceSupplier{openErr: openErr}
+ g := newGroup(nil, nil, supplier)
+
+ err := g.initBySchema(buildGroup("g1"))
+ require.Error(t, err)
+ assert.True(t, errors.Is(err, openErr))
+ assert.False(t, g.isInit(), "group must not be initialized after a
failed OpenDB")
+ assert.Nil(t, g.GetSchema(), "groupSchema must not be stored after a
failed OpenDB")
+ assert.Nil(t, g.SupplyTSDB(), "SupplyTSDB must be nil after a failed
OpenDB")
+}
+
+// TestInitBySchema_PortableGroupOK asserts that a portable group (nil
+// resourceSupplier) stores the schema and is considered initialized without
+// invoking OpenDB.
+func TestInitBySchema_PortableGroupOK(t *testing.T) {
+ g := newGroup(nil, nil, nil)
+ require.NoError(t, g.initBySchema(buildGroup("portable")))
+ assert.True(t, g.isInit(), "portable group must be initialized after
initBySchema")
+ assert.Nil(t, g.SupplyTSDB(), "portable group must have no underlying
tsdb")
+}
+
+// TestInitBySchema_SuccessStoresSchema asserts the success path stores schema
+// after OpenDB returns.
+func TestInitBySchema_SuccessStoresSchema(t *testing.T) {
+ supplier := &fakeResourceSupplier{}
+ g := newGroup(nil, nil, supplier)
+ require.NoError(t, g.initBySchema(buildGroup("g1")))
+ assert.True(t, g.isInit())
+ assert.Equal(t, 1, supplier.openCalls)
+}
+
+// TestInitGroup_RetryAfterFailedOpenDB asserts that initGroup retries OpenDB
on
+// the second pass when the first attempt left the group un-initialized — the
+// fix for the silent-fail bug where a half-broken group survived in the cache.
+func TestInitGroup_RetryAfterFailedOpenDB(t *testing.T) {
+ openErr := errors.New("transient")
+ supplier := &fakeResourceSupplier{openErr: openErr}
+ repo := &schemaRepo{
+ l: testLogger(),
+ resourceSupplier: supplier,
+ }
+
+ gs := buildGroup("g1")
+ _, firstErr := repo.initGroup(gs)
+ require.Error(t, firstErr)
+ require.Equal(t, 1, supplier.openCalls)
+
+ cached, ok := repo.getGroup("g1")
+ require.True(t, ok)
+ require.False(t, cached.isInit())
+
+ supplier.openErr = nil
+ g, secondErr := repo.initGroup(gs)
+ require.NoError(t, secondErr)
+ require.NotNil(t, g)
+ assert.True(t, g.isInit())
+ assert.Equal(t, 2, supplier.openCalls, "OpenDB must be re-attempted
when the cached group is not initialized")
+}
+
+// TestInitGroup_AlreadyInit_NoReopen asserts that an already-initialized group
+// is returned without reinvoking OpenDB.
+func TestInitGroup_AlreadyInit_NoReopen(t *testing.T) {
+ supplier := &fakeResourceSupplier{}
+ repo := &schemaRepo{
+ l: testLogger(),
+ resourceSupplier: supplier,
+ }
+ gs := buildGroup("g1")
+ _, err := repo.initGroup(gs)
+ require.NoError(t, err)
+ require.Equal(t, 1, supplier.openCalls)
+ _, err = repo.initGroup(gs)
+ require.NoError(t, err)
+ assert.Equal(t, 1, supplier.openCalls, "already-initialized group must
not re-invoke OpenDB")
+}
diff --git a/pkg/schema/cache_watcher_fatal_test.go
b/pkg/schema/cache_watcher_fatal_test.go
new file mode 100644
index 000000000..3c66c5d28
--- /dev/null
+++ b/pkg/schema/cache_watcher_fatal_test.go
@@ -0,0 +1,256 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+ stderrors "errors"
+ "fmt"
+ "os"
+ "os/exec"
+ "strings"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/initerror"
+ pkglogger "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+ "github.com/apache/skywalking-banyandb/pkg/run"
+)
+
+// nopCounter is a meter.Counter that discards all increments. Used by the
+// fatal-fail subprocess tests so the worker's deferred Inc(1) does not
+// nil-panic in the child process.
+type nopCounter struct{}
+
+func (nopCounter) Inc(_ float64, _ ...string) {}
+func (nopCounter) Delete(_ ...string) bool { return false }
+
+var _ meter.Counter = nopCounter{}
+
+const (
+ // fatalChildEnv signals to the test binary that it is the in-process
+ // child for a subprocess fast-fail assertion. Parents re-exec the
+ // running test binary with this env set; only the matching child
+ // branch executes — the parent does not run the child code path.
+ fatalChildEnv = "BANYAND_TEST_F5_FATAL_CHILD"
+ fatalChildModeWatcher = "watcher"
+ fatalChildModeSendSync = "send-sync"
+)
+
+// permanentPanicSupplier is a ResourceSchemaSupplier whose OpenResource panics
+// with an error-typed permanent error. It exercises the Watcher worker's
+// recover() classifier (Step 1) — the panic propagates up through processEvent
+// and is caught in the worker's deferred recover(), where IsPermanent
classifies
+// it as fatal.
+type permanentPanicSupplier struct{}
+
+func (permanentPanicSupplier) ResourceSchema(_ *commonv1.Metadata)
(ResourceSchema, error) {
+ return nil, nil
+}
+
+func (permanentPanicSupplier) OpenResource(_ Resource) (IndexListener, error) {
+ panic(initerror.AsPermanent(stderrors.New("incompatible version
1.3.0")))
+}
+
+// permanentErrorSupplier is a ResourceSchemaSupplier whose OpenResource
returns
+// a permanent error. It exercises the synchronous-return classifier in
+// SendMetadataEvent (Step 2b).
+type permanentErrorSupplier struct{}
+
+func (permanentErrorSupplier) ResourceSchema(_ *commonv1.Metadata)
(ResourceSchema, error) {
+ return nil, nil
+}
+
+func (permanentErrorSupplier) OpenResource(_ Resource) (IndexListener, error) {
+ return nil, initerror.AsPermanent(stderrors.New("incompatible version
1.3.0"))
+}
+
+// transientThenOKSupplier returns a non-permanent error on the first call and
+// nil on the second. It is used to verify the transient-retry path stays
+// functional after F5 (Test 5.6).
+type transientThenOKSupplier struct {
+ openCalls atomic.Int32
+}
+
+func (s *transientThenOKSupplier) ResourceSchema(_ *commonv1.Metadata)
(ResourceSchema, error) {
+ return nil, nil
+}
+
+func (s *transientThenOKSupplier) OpenResource(_ Resource) (IndexListener,
error) {
+ calls := s.openCalls.Add(1)
+ if calls == 1 {
+ return nil, stderrors.New("etcd hiccup")
+ }
+ return noopIndexListener{}, nil
+}
+
+// fatalChildLogger returns a logger whose .Fatal() output goes to stderr so
+// the parent test can assert on the exit message via CombinedOutput.
+func fatalChildLogger() *pkglogger.Logger {
+ _ = pkglogger.Init(pkglogger.Logging{Env: "dev", Level: "info"})
+ return pkglogger.GetLogger("test", "schema-cache-fatal")
+}
+
+// fatalChildMetrics returns a Metrics instance backed by no-op counters so
+// the worker's deferred Inc(1) call does not nil-panic in the child process.
+func fatalChildMetrics() *Metrics {
+ return &Metrics{
+ totalErrs: nopCounter{},
+ totalRetries: nopCounter{},
+ totalPanics: nopCounter{},
+ }
+}
+
+func runFatalChild(t *testing.T, mode string) string {
+ t.Helper()
+ // #nosec G204 — os.Args[0] is the test binary path, set by the Go test
+ // runner. The mode argument is one of two compile-time string constants
+ // declared above.
+ cmd := exec.Command(os.Args[0], "-test.run=^"+t.Name()+"$", "-test.v")
+ cmd.Env = append(os.Environ(), fatalChildEnv+"="+mode)
+ out, err := cmd.CombinedOutput()
+ require.Error(t, err, "child must exit non-zero; output:\n%s",
string(out))
+ var exitErr *exec.ExitError
+ require.ErrorAs(t, err, &exitErr, "child must produce *exec.ExitError;
output:\n%s", string(out))
+ require.NotZero(t, exitErr.ExitCode(), "child must exit with non-zero
code; output:\n%s", string(out))
+ return string(out)
+}
+
+// TestWatcherHitPermanentError_FailsFast verifies that the Watcher worker's
+// recover() classifier escalates an error-typed permanent panic to .Fatal(),
+// which exits the process non-zero (Step 1).
+func TestWatcherHitPermanentError_FailsFast(t *testing.T) {
+ if os.Getenv(fatalChildEnv) == fatalChildModeWatcher {
+ runWatcherFatalChild()
+ return
+ }
+ out := runFatalChild(t, fatalChildModeWatcher)
+ if !strings.Contains(out, "Watcher hit a permanent error") {
+ t.Fatalf("expected stderr to mention 'Watcher hit a permanent
error', got:\n%s", out)
+ }
+}
+
+// runWatcherFatalChild constructs a minimal schemaRepo whose OpenResource
+// panics with a permanent error, starts the Watcher, and pushes an event
+// through eventCh. The Watcher worker's recover() classifier should escalate
+// to .Fatal() within a few hundred milliseconds.
+func runWatcherFatalChild() {
+ repo := &schemaRepo{
+ l: fatalChildLogger(),
+ resourceSchemaSupplier: permanentPanicSupplier{},
+ eventCh: make(chan MetadataEvent, 1),
+ workerNum: 1,
+ closer: run.NewChannelCloser(),
+ metrics: fatalChildMetrics(),
+ }
+ repo.Watcher()
+ repo.eventCh <- MetadataEvent{
+ Typ: EventAddOrUpdate,
+ Kind: EventKindResource,
+ Metadata: &databasev1.Measure{
+ Metadata: &commonv1.Metadata{Group: "g", Name: "m",
ModRevision: 1},
+ },
+ }
+ // Hold the goroutine alive long enough for the worker's defer to run.
+ // .Fatal() calls os.Exit(1), so this sleep should never complete.
+ time.Sleep(5 * time.Second)
+ fmt.Fprintln(os.Stderr, "child did not exit; classifier did not fire")
+ os.Exit(2)
+}
+
+// TestSendMetadataEventHitPermanentError_FailsFast verifies that the
+// synchronous SendMetadataEvent path escalates a permanent error returned
+// from processEvent to .Fatal() (Step 2b).
+func TestSendMetadataEventHitPermanentError_FailsFast(t *testing.T) {
+ if os.Getenv(fatalChildEnv) == fatalChildModeSendSync {
+ runSendSyncFatalChild()
+ return
+ }
+ out := runFatalChild(t, fatalChildModeSendSync)
+ if !strings.Contains(out, "SendMetadataEvent hit a permanent error") {
+ t.Fatalf("expected stderr to mention 'SendMetadataEvent hit a
permanent error', got:\n%s", out)
+ }
+}
+
+// runSendSyncFatalChild calls SendMetadataEvent on a schemaRepo whose
+// OpenResource returns a permanent error. The synchronous classifier branch
+// at cache.go (Step 2b) should escalate to .Fatal() before SendMetadataEvent
+// returns.
+func runSendSyncFatalChild() {
+ repo := &schemaRepo{
+ l: fatalChildLogger(),
+ resourceSchemaSupplier: permanentErrorSupplier{},
+ eventCh: make(chan MetadataEvent, 1),
+ workerNum: 1,
+ closer: run.NewChannelCloser(),
+ metrics: fatalChildMetrics(),
+ }
+ repo.SendMetadataEvent(MetadataEvent{
+ Typ: EventAddOrUpdate,
+ Kind: EventKindResource,
+ Metadata: &databasev1.Measure{
+ Metadata: &commonv1.Metadata{Group: "g", Name: "m",
ModRevision: 1},
+ },
+ })
+ fmt.Fprintln(os.Stderr, "child did not exit; classifier did not fire")
+ os.Exit(2)
+}
+
+// TestSendMetadataEvent_TransientErrorRequeues verifies that a non-permanent
+// error from processEvent is requeued via eventCh (the existing transient
+// path) and not escalated to .Fatal() (Test 5.6 — preserves transient retry).
+func TestSendMetadataEvent_TransientErrorRequeues(t *testing.T) {
+ supplier := &transientThenOKSupplier{}
+ repo := &schemaRepo{
+ l: testLogger(),
+ resourceSchemaSupplier: supplier,
+ eventCh: make(chan MetadataEvent, 1),
+ workerNum: 1,
+ closer: run.NewChannelCloser(),
+ metrics: fatalChildMetrics(),
+ }
+
+ evt := MetadataEvent{
+ Typ: EventAddOrUpdate,
+ Kind: EventKindResource,
+ Metadata: &databasev1.Measure{
+ Metadata: &commonv1.Metadata{Group: "g", Name: "m",
ModRevision: 1},
+ },
+ }
+ repo.SendMetadataEvent(evt)
+
+ // First processEvent failed with a transient error; SendMetadataEvent
should
+ // have pushed onto eventCh for the worker (or self) to retry.
+ select {
+ case requeued := <-repo.eventCh:
+ require.Equal(t, EventKindResource, requeued.Kind, "requeued
event must be the original")
+ case <-time.After(2 * time.Second):
+ t.Fatalf("transient error must requeue the event onto eventCh;
openCalls=%d", supplier.openCalls.Load())
+ }
+ require.EqualValues(t, 1, supplier.openCalls.Load(), "first
SendMetadataEvent must call OpenResource exactly once")
+
+ // Second pass: drive the requeued event through processEvent directly
to
+ // confirm OpenResource is reattempted and succeeds.
+ require.NoError(t, repo.processEvent(evt))
+ require.EqualValues(t, 2, supplier.openCalls.Load(), "second pass must
reattempt OpenResource")
+}
diff --git a/pkg/schema/init.go b/pkg/schema/init.go
index cf66ad0c6..8ced2c053 100644
--- a/pkg/schema/init.go
+++ b/pkg/schema/init.go
@@ -101,9 +101,9 @@ func (sr *schemaRepo) getCatalog(kind schema.Kind)
commonv1.Catalog {
}
func (sr *schemaRepo) processGroup(ctx context.Context, g *commonv1.Group,
catalog commonv1.Catalog) {
- _, err := sr.initGroup(g)
- if err != nil {
- logger.Panicf("fails to init the group: %v", err)
+ if _, initErr := sr.initGroup(g); initErr != nil {
+ sr.l.Error().Err(initErr).Str("group",
g.Metadata.GetName()).Msg("fails to init the group")
+ panic(fmt.Errorf("fails to init the group %s: %w",
g.Metadata.GetName(), initErr))
}
sr.processRules(ctx, g.Metadata.GetName())
sr.processBindings(ctx, g.Metadata.GetName())
@@ -223,6 +223,12 @@ func (sr *schemaRepo) initGroup(groupSchema
*commonv1.Group) (*group, error) {
defer sr.groupMux.Unlock()
g, ok := sr.getGroup(groupSchema.Metadata.Name)
if ok {
+ if g.isInit() {
+ return g, nil
+ }
+ if reinitErr := g.initBySchema(groupSchema); reinitErr != nil {
+ return nil, reinitErr
+ }
return g, nil
}
sr.l.Info().Str("group", groupSchema.Metadata.Name).Msg("creating a
tsdb")