This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit a887dfc2680e4b7c118a836eb2b9eb0ab0386ed7 Author: Gao Hongtao <[email protected]> AuthorDate: Mon May 11 09:22:04 2026 +0800 fix: fail fast on incompatible storage version (boot + runtime paths) (#1124) --- CHANGES.md | 1 + Makefile | 34 ++- banyand/internal/storage/tsdb.go | 21 +- banyand/internal/storage/tsdb_test.go | 100 +++++++ banyand/internal/storage/version.go | 5 +- banyand/internal/storage/version_test.go | 29 ++ banyand/metadata/schema/property/client.go | 34 ++- .../metadata/schema/property/init_handler_test.go | 209 +++++++++++++++ pkg/initerror/initerror.go | 61 +++++ pkg/initerror/initerror_test.go | 71 +++++ pkg/schema/cache.go | 298 ++++++++++++++++----- pkg/schema/cache_test.go | 297 ++++++++++++++++++++ pkg/schema/cache_watcher_fatal_test.go | 256 ++++++++++++++++++ pkg/schema/init.go | 12 +- 14 files changed, 1340 insertions(+), 88 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index c893e5740..fe42fa088 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -39,6 +39,7 @@ Release Notes. - Add `GroupLifecycleInfo.errors` to surface per-group collection failures from FODC `InspectAll` instead of silently dropping the affected node entry. - Fix `CollectDataInfo` and `CollectLiaisonInfo` not handling `CATALOG_PROPERTY` groups. - Fix lifecycle migration where the receiving node could create segments shorter than the configured `SegmentInterval`. +- Fail fast on incompatible storage version at boot. Previously the server would start in a degraded `SERVING` state with affected groups un-loaded because the property schema-registry retry loop swallowed the version-incompatibility panic. Compatible versions are listed in `banyand/internal/storage/versions.yml`. ### Chores diff --git a/Makefile b/Makefile index 210087df9..950d73f82 100644 --- a/Makefile +++ b/Makefile @@ -47,6 +47,9 @@ generate: TARGET=generate generate: PROJECTS:=api $(PROJECTS) pkg generate: default ## Generate API codes +generate-test-cases: ## Regenerate measure query test cases (input/*.ql, input/*.yaml) + go run ./test/cases/measure/cmd/generate generate test/cases/measure/data + build: TARGET=all build: default ## Build all projects @@ -93,12 +96,40 @@ test-docker: ## Run tests in Docker with constrained resources (2 CPU cores, 4GB -ldflags "-X github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X github.com/apache/skywalking-banyandb/pkg/test/flags.consistentlyTimeout=10s -X github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=error" \ $(PKG) +load-test-barrier: ## Run the schema-barrier CP-6 SLO load harness (3 data nodes + 1 liaison, 100 callers, 5m measurement). Override with LOAD_FLAGS="-loadtest.measure=30s ..." for smoke runs. + @echo "Running schema-barrier load harness (CP-6 SLO profile)" + @echo "Override profile via LOAD_FLAGS, e.g. LOAD_FLAGS='-loadtest.measure=30s -loadtest.callers=20'" + go test -tags=loadtest -timeout 30m -count=1 -v ./test/load/schema_barrier/... $(LOAD_FLAGS) + ##@ Code quality targets lint: TARGET=lint lint: PROJECTS:=api $(PROJECTS) pkg scripts/ci/check test +lint: check-import-boundaries lint: default ## Run the linters on all projects +# check-import-boundaries enforces the layering invariants documented in +# pkg/initerror/initerror.go: the leaf permanent-error contract must not gain +# project-internal dependencies, and the property schema-registry classifier +# must not import the storage internals it classifies via the leaf interface. +.PHONY: check-import-boundaries +check-import-boundaries: ## Enforce import-boundary invariants for the version-incompat fix + @bad=0; \ + if grep -rln 'banyand/internal/storage' banyand/metadata/schema/property/ 2>/dev/null; then \ + echo "FAIL: banyand/metadata/schema/property/ must not import banyand/internal/storage"; \ + bad=1; \ + fi; \ + if grep -rln 'banyand/internal/storage' pkg/schema/ 2>/dev/null; then \ + echo "FAIL: pkg/schema/ must not import banyand/internal/storage"; \ + bad=1; \ + fi; \ + if grep -rln 'github.com/apache/skywalking-banyandb/' pkg/initerror/*.go 2>/dev/null | grep -v _test.go; then \ + echo "FAIL: pkg/initerror/ must remain a leaf with zero project-internal imports (test files excepted)"; \ + bad=1; \ + fi; \ + if [ $$bad -ne 0 ]; then exit 1; fi; \ + echo "import boundaries OK" + ##@ Vendor update vendor-update: TARGET=vendor-update @@ -141,6 +172,7 @@ check: ## Check that the status is consistent with CI pre-push: ## Check source files before pushing to the remote repo $(MAKE) check-req $(MAKE) generate + $(MAKE) generate-test-cases $(MAKE) lint $(MAKE) check $(MAKE) vuln-check @@ -222,7 +254,7 @@ release-push-candidate: ## Push release candidate ${PUSH_RELEASE_SCRIPTS} .PHONY: all $(PROJECTS) clean build default nuke -.PHONY: lint check tidy format pre-push +.PHONY: lint check tidy format pre-push generate-test-cases check-import-boundaries .PHONY: test test-race test-coverage test-ci test-docker .PHONY: license-check license-fix license-dep .PHONY: release release-binary release-source release-sign release-assembly diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go index fa20507e4..22f26569e 100644 --- a/banyand/internal/storage/tsdb.go +++ b/banyand/internal/storage/tsdb.go @@ -19,6 +19,7 @@ package storage import ( "context" + "fmt" "path/filepath" "strings" "sync" @@ -172,7 +173,7 @@ func (d *database[T, O]) Close() error { d.segmentController.close() d.lock.Close() if err := d.lfs.DeleteFile(d.lock.Path()); err != nil { - logger.Panicf("cannot delete lock file %s: %s", d.lock.Path(), err) + panic(fmt.Errorf("cannot delete lock file %s: %w", d.lock.Path(), err)) } return nil } @@ -233,14 +234,30 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts TSDBOpts[T, O], cache lockPath := filepath.Join(opts.Location, lockFilename) lock, err := tsdbLfs.CreateLockFile(lockPath, FilePerm) if err != nil { - logger.Panicf("cannot create lock file %s: %s", lockPath, err) + panic(fmt.Errorf("cannot create lock file %s: %w", lockPath, err)) } db.lock = lock + // Release the lock fd if any subsequent step in OpenTSDB returns an error. + // Otherwise the file descriptor leaks until process exit and a retry on the + // same data dir hits "resource temporarily unavailable" from the next + // CreateLockFile call. Ownership transfers to (*database).Close() only on + // success — released = true is set on the success return below. + released := false + defer func() { + if !released { + _ = lock.Close() + _ = tsdbLfs.DeleteFile(lockPath) + } + }() if err := db.segmentController.open(); err != nil { return nil, err } obsservice.MetricsCollector.Register(location, db.collect) db.disableRotation = opts.DisableRotation + // db (and its lock fd) is now owned by the caller; on rotation-task error + // the partially-initialized db is still returned so the caller can Close + // it and unregister the metrics collector. + released = true return db, db.startRotationTask() } diff --git a/banyand/internal/storage/tsdb_test.go b/banyand/internal/storage/tsdb_test.go index 6b786a9c5..4852b9bd6 100644 --- a/banyand/internal/storage/tsdb_test.go +++ b/banyand/internal/storage/tsdb_test.go @@ -19,6 +19,8 @@ package storage import ( "context" + "errors" + "os" "path/filepath" "testing" "time" @@ -28,6 +30,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/initerror" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/meter" "github.com/apache/skywalking-banyandb/pkg/test" @@ -670,3 +673,100 @@ func TestCollectWithPartialClosedSegments(t *testing.T) { // Clean up require.NoError(t, tsdb.Close()) } + +// newTestSegmentSkeleton seeds a minimal on-disk segment layout under dir with +// the supplied storage version stamp. The layout matches readSegmentMeta's +// "old format" (version-only file body), which is sufficient to exercise +// checkVersion at TSDB open time. +func newTestSegmentSkeleton(t *testing.T, dir, version string) { + t.Helper() + segDir := filepath.Join(dir, "seg-20240501") + require.NoError(t, os.MkdirAll(segDir, 0o755)) + metaPath := filepath.Join(segDir, metadataFilename) + require.NoError(t, os.WriteFile(metaPath, []byte(version), 0o600)) +} + +// TestTSDBOpen_LockReleasedAfterFailedOpen reproduces the lock-file leak that +// the F5 v2 lock-fix targets. Prior to the fix, OpenTSDB acquired the lock +// file but never released it on the segmentController.open() error path, so +// a second OpenTSDB call against the same directory failed with +// "resource temporarily unavailable" from CreateLockFile. The fix adds a +// defer-release with a `released` boolean flipped to true only on the +// success path. This test seeds an incompatible segment to force the first +// OpenTSDB to fail, then removes the segment and asserts the second +// OpenTSDB succeeds — which is only possible if the lock fd was released. +func TestTSDBOpen_LockReleasedAfterFailedOpen(t *testing.T) { + logger.Init(logger.Logging{Env: "dev", Level: flags.LogLevel}) + + dir, defFn := test.Space(require.New(t)) + defer defFn() + + newTestSegmentSkeleton(t, dir, "1.3.0") + + opts := TSDBOpts[*MockTSTable, any]{ + Location: dir, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 3}, + ShardNum: 1, + TSTableCreator: MockTSTableCreator, + } + + ctx := context.Background() + mc := timestamp.NewMockClock() + ts, parseErr := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 00:00:00", time.Local) + require.NoError(t, parseErr) + mc.Set(ts) + ctx = timestamp.SetClock(ctx, mc) + + // First open fails on the incompatible segment. With the leak fix the + // defer in OpenTSDB releases the lock fd before returning the error. + serviceCache := NewServiceCache() + _, openErr := OpenTSDB(ctx, opts, serviceCache, group) + require.Error(t, openErr, "first OpenTSDB must fail on the incompatible segment") + require.True(t, initerror.IsPermanent(openErr), "first OpenTSDB error must be permanent") + + // Remove the bad segment so the second open's segmentController scan succeeds. + require.NoError(t, os.RemoveAll(filepath.Join(dir, "seg-20240501"))) + + // Second open against the same dir must succeed — only possible if the + // lock fd from the first attempt was released. Without the fix this + // returns "resource temporarily unavailable" from CreateLockFile. + tsdb, reopenErr := OpenTSDB(ctx, opts, serviceCache, group) + require.NoError(t, reopenErr, "second OpenTSDB must succeed after the failed first attempt") + require.NotNil(t, tsdb) + require.NoError(t, tsdb.Close()) +} + +// TestTSDBOpen_RejectsIncompatibleSegment verifies that opening a TSDB whose +// on-disk segment is stamped with an incompatible version surfaces a permanent +// initialization error so the caller fails fast instead of silently dropping +// the affected group. +func TestTSDBOpen_RejectsIncompatibleSegment(t *testing.T) { + logger.Init(logger.Logging{Env: "dev", Level: flags.LogLevel}) + + dir, defFn := test.Space(require.New(t)) + defer defFn() + + newTestSegmentSkeleton(t, dir, "1.3.0") + + opts := TSDBOpts[*MockTSTable, any]{ + Location: dir, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 3}, + ShardNum: 1, + TSTableCreator: MockTSTableCreator, + } + + ctx := context.Background() + mc := timestamp.NewMockClock() + ts, parseErr := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 00:00:00", time.Local) + require.NoError(t, parseErr) + mc.Set(ts) + ctx = timestamp.SetClock(ctx, mc) + + serviceCache := NewServiceCache() + _, openErr := OpenTSDB(ctx, opts, serviceCache, group) + require.Error(t, openErr, "OpenTSDB must reject an incompatible-version segment") + require.True(t, initerror.IsPermanent(openErr), "incompatible-version error must surface as permanent") + require.True(t, errors.Is(openErr, errVersionIncompatible), "error must still match the version-incompatible sentinel") +} diff --git a/banyand/internal/storage/version.go b/banyand/internal/storage/version.go index ddea79a33..82d1adaaf 100644 --- a/banyand/internal/storage/version.go +++ b/banyand/internal/storage/version.go @@ -24,6 +24,8 @@ import ( "github.com/pkg/errors" "sigs.k8s.io/yaml" + + "github.com/apache/skywalking-banyandb/pkg/initerror" ) const ( @@ -46,7 +48,8 @@ func checkVersion(version string) error { return nil } } - return errors.WithMessagef(errVersionIncompatible, "incompatible version %s, supported versions: %s", version, strings.Join(compatibleVersions, ", ")) + return initerror.AsPermanent(errors.WithMessagef(errVersionIncompatible, + "incompatible version %s, supported versions: %s", version, strings.Join(compatibleVersions, ", "))) } type segmentMeta struct { diff --git a/banyand/internal/storage/version_test.go b/banyand/internal/storage/version_test.go index 726e6d8a2..31350d56d 100644 --- a/banyand/internal/storage/version_test.go +++ b/banyand/internal/storage/version_test.go @@ -18,10 +18,14 @@ package storage import ( + "errors" + "fmt" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/pkg/initerror" ) func TestReadSegmentMeta_NewFormat(t *testing.T) { @@ -52,6 +56,31 @@ func TestReadSegmentMeta_IncompatibleVersion(t *testing.T) { data := []byte(`{"version":"0.1.0","endTime":"2026-04-07T00:00:00+08:00"}`) _, err := readSegmentMeta(data) assert.Error(t, err) + assert.True(t, initerror.IsPermanent(err), "incompatible version must surface as permanent") + assert.True(t, errors.Is(err, errVersionIncompatible), "incompatible version must still match the sentinel") +} + +func TestCheckVersion_WrappedAsPermanent(t *testing.T) { + err := checkVersion("1.3.0") + require.Error(t, err) + assert.True(t, initerror.IsPermanent(err)) + assert.True(t, errors.Is(err, errVersionIncompatible)) + + wrapped := fmt.Errorf("init: %w", err) + assert.True(t, initerror.IsPermanent(wrapped), "permanence must survive fmt.Errorf wrapping") + assert.True(t, errors.Is(wrapped, errVersionIncompatible)) +} + +func TestCheckVersion_CompatibleReturnsNil(t *testing.T) { + require.NoError(t, checkVersion(currentVersion)) + for _, v := range compatibleVersions { + require.NoError(t, checkVersion(v)) + } +} + +func TestIsPermanent_UnrelatedErrorIsNotPermanent(t *testing.T) { + assert.False(t, initerror.IsPermanent(errors.New("plain"))) + assert.False(t, initerror.IsPermanent(nil)) } func TestReadSegmentMeta_NewFormatNoEndTime(t *testing.T) { diff --git a/banyand/metadata/schema/property/client.go b/banyand/metadata/schema/property/client.go index 1a52c4130..926a34914 100644 --- a/banyand/metadata/schema/property/client.go +++ b/banyand/metadata/schema/property/client.go @@ -41,6 +41,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/initerror" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -115,6 +116,7 @@ type SchemaRegistry struct { l *logger.Logger cache *schemaCache caCertReloader *pkgtls.Reloader + clock timestamp.Clock handlers map[schema.Kind][]schema.EventHandler watchSessions map[string]*watchSession syncInterval time.Duration @@ -180,6 +182,7 @@ func NewSchemaRegistryClient(cfg *ClientConfig) (*SchemaRegistry, error) { l: l, cache: newSchemaCache(), caCertReloader: caCertReloader, + clock: timestamp.NewClock(), handlers: make(map[schema.Kind][]schema.EventHandler), watchSessions: make(map[string]*watchSession), syncInterval: syncInterval, @@ -976,31 +979,48 @@ func (r *SchemaRegistry) RegisterHandler(name string, kind schema.Kind, handler } func (r *SchemaRegistry) initHandlerWithRetry(name string, handler schema.EventHandler, kinds []schema.Kind) { - deadline := time.Now().Add(time.Minute) + deadline := r.clock.Now().Add(time.Minute) var lastErr interface{} for { - if tryCallOnInit(handler, kinds, &lastErr) { + ok, permanent := tryCallOnInit(handler, kinds, &lastErr) + if ok { return } - if time.Now().After(deadline) { - r.l.Panic().Str("name", name).Interface("error", lastErr). + if permanent { + r.l.Error().Str("name", name).Str("error", fmt.Sprintf("%+v", lastErr)). + Msg("OnInit hit a permanent error, refusing to start") + if recErr, isErr := lastErr.(error); isErr { + panic(fmt.Errorf("OnInit %s: %w", name, recErr)) + } + panic(fmt.Errorf("OnInit %s: %v", name, lastErr)) + } + if r.clock.Now().After(deadline) { + r.l.Error().Str("name", name).Interface("error", lastErr). Msg("handler OnInit failed after 1m, giving up") + panic(fmt.Errorf("OnInit %s exceeded deadline: %v", name, lastErr)) } r.l.Warn().Str("name", name).Interface("error", lastErr). Msg("handler OnInit panicked due to transient error, retrying in 1s") - time.Sleep(time.Second) + r.clock.Sleep(time.Second) } } -func tryCallOnInit(handler schema.EventHandler, kinds []schema.Kind, lastErr *interface{}) (ok bool) { +// tryCallOnInit invokes handler.OnInit and recovers any panic. Do not wrap the +// recovered value with multierr before classification; multierr.Append's +// Unwrap() []error is traversed by errors.As, but extra wrapping changes which +// concrete type is returned first and breaks IsPermanent's interface lookup. +func tryCallOnInit(handler schema.EventHandler, kinds []schema.Kind, lastErr *interface{}) (ok, permanent bool) { defer func() { if rec := recover(); rec != nil { *lastErr = rec ok = false + if recErr, isErr := rec.(error); isErr { + permanent = initerror.IsPermanent(recErr) + } } }() handler.OnInit(kinds) - return true + return true, false } // Start starts a single goroutine that periodically syncs schemas. diff --git a/banyand/metadata/schema/property/init_handler_test.go b/banyand/metadata/schema/property/init_handler_test.go new file mode 100644 index 000000000..4e6625892 --- /dev/null +++ b/banyand/metadata/schema/property/init_handler_test.go @@ -0,0 +1,209 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "errors" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/initerror" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +// retryTestHandler implements schema.EventHandler for retry-classification tests. +// Each OnInit call invokes the configured panicFn (which may panic). When +// noPanicAfter calls have been made, OnInit returns normally so the retry loop +// can succeed. +type retryTestHandler struct { + panicFn func(callIdx int) + startedSignal chan struct{} + schema.UnimplementedOnInitHandler + calls atomic.Int32 + noPanicAfter int32 +} + +func (h *retryTestHandler) OnAddOrUpdate(_ schema.Metadata) {} +func (h *retryTestHandler) OnDelete(_ schema.Metadata) {} + +func (h *retryTestHandler) OnInit(_ []schema.Kind) (bool, []int64) { + idx := h.calls.Add(1) + if h.startedSignal != nil { + select { + case h.startedSignal <- struct{}{}: + default: + } + } + if h.noPanicAfter > 0 && idx > h.noPanicAfter { + return true, nil + } + if h.panicFn != nil { + h.panicFn(int(idx)) + } + return true, nil +} + +// newTestRegistryWithClock builds a SchemaRegistry instance that bypasses +// connection setup so we can drive its retry classifier in-process. It uses a +// mock clock so the test controls the deadline and Sleep windows. +func newTestRegistryWithClock(_ *testing.T, mc timestamp.MockClock) *SchemaRegistry { + l := logger.GetLogger("init-handler-retry-test") + return &SchemaRegistry{ + l: l, + clock: mc, + } +} + +// TestInitHandlerWithRetry_PermanentVersionError_FailsFast asserts that a +// handler panicking with a Permanent-wrapped error causes the retry loop to +// exit on the first attempt via r.l.Panic, without ever advancing the clock. +func TestInitHandlerWithRetry_PermanentVersionError_FailsFast(t *testing.T) { + require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level: "warn"})) + mc := timestamp.NewMockClock() + mcStart := mc.Now() + r := newTestRegistryWithClock(t, mc) + permErr := initerror.AsPermanent(errors.New("incompatible version 1.3.0")) + handler := &retryTestHandler{ + panicFn: func(_ int) { + panic(fmt.Errorf("OpenDB failed: %w", permErr)) + }, + } + + done := make(chan struct{}) + var recovered interface{} + go func() { + defer close(done) + defer func() { recovered = recover() }() + r.initHandlerWithRetry("perm-handler", handler, []schema.Kind{schema.KindGroup}) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("initHandlerWithRetry did not return within 2s; permanent error path should fail fast") + } + + require.NotNil(t, recovered, "permanent error must propagate as a panic") + recErr, isErr := recovered.(error) + require.True(t, isErr, "panic value should be an error wrapping the permanent error") + assert.True(t, initerror.IsPermanent(recErr), "recovered error must satisfy IsPermanent") + assert.Equal(t, int32(1), handler.calls.Load(), "permanent error must not be retried") + assert.Equal(t, mcStart, mc.Now(), "mock clock must not advance when the first attempt is permanent") +} + +// TestInitHandlerWithRetry_TransientStillRetries asserts that a handler +// panicking with a non-permanent error keeps retrying until either the deadline +// passes (panic) or it stops panicking (success). Here we let the handler stop +// panicking after two attempts to exercise the success path. +func TestInitHandlerWithRetry_TransientStillRetries(t *testing.T) { + require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level: "warn"})) + mc := timestamp.NewMockClock() + r := newTestRegistryWithClock(t, mc) + handler := &retryTestHandler{ + noPanicAfter: 2, + startedSignal: make(chan struct{}, 4), + panicFn: func(_ int) { + panic("etcd unreachable") + }, + } + + done := make(chan struct{}) + go func() { + defer close(done) + r.initHandlerWithRetry("transient-handler", handler, []schema.Kind{schema.KindGroup}) + }() + + // Wait for the handler's first OnInit call to have run. + for i := 0; i < 2; i++ { + select { + case <-handler.startedSignal: + case <-time.After(2 * time.Second): + t.Fatalf("handler.OnInit was not invoked for attempt %d within 2s", i+1) + } + // Advance the mock clock so the worker's Sleep returns and it can + // re-enter the retry loop. The benbjohnson MockClock fires sleepers + // when Add brings the time past their deadline, even if the sleeper + // registered after the Add — but we still poll briefly to avoid races. + require.Eventually(t, func() bool { + mc.Add(time.Second) + return handler.calls.Load() > int32(i+1) || done != nil && isClosed(done) + }, 3*time.Second, 10*time.Millisecond, "Sleep must release after clock advance") + } + + // Wait for the loop to finish (third call returns ok=true). + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatal("initHandlerWithRetry did not return after handler stopped panicking") + } + assert.GreaterOrEqual(t, handler.calls.Load(), int32(3), "handler must be retried at least twice before succeeding") +} + +// TestInitHandlerWithRetry_DeadlineExceededPanics asserts that a non-permanent +// failure that never stops eventually triggers r.l.Panic when the deadline is +// crossed. +func TestInitHandlerWithRetry_DeadlineExceededPanics(t *testing.T) { + require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level: "warn"})) + mc := timestamp.NewMockClock() + r := newTestRegistryWithClock(t, mc) + handler := &retryTestHandler{ + startedSignal: make(chan struct{}, 8), + panicFn: func(_ int) { + panic("never resolves") + }, + } + + done := make(chan struct{}) + var recovered interface{} + go func() { + defer close(done) + defer func() { recovered = recover() }() + r.initHandlerWithRetry("deadline-handler", handler, []schema.Kind{schema.KindGroup}) + }() + + deadline := time.Now().Add(5 * time.Second) + for { + select { + case <-done: + require.NotNil(t, recovered, "deadline exceeded must propagate as a panic") + return + case <-handler.startedSignal: + case <-time.After(50 * time.Millisecond): + } + if time.Now().After(deadline) { + t.Fatal("initHandlerWithRetry did not exit after repeated clock advances") + } + mc.Add(2 * time.Minute) + } +} + +func isClosed(ch <-chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} diff --git a/pkg/initerror/initerror.go b/pkg/initerror/initerror.go new file mode 100644 index 000000000..6bff59384 --- /dev/null +++ b/pkg/initerror/initerror.go @@ -0,0 +1,61 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package initerror exposes a Permanent error contract for initialization +// failures that must not be retried. The package is a leaf: it imports nothing +// project-internal so it can be referenced by both the storage layer (which +// produces permanent errors) and the metadata-schema retry classifier (which +// reads them) without introducing a layering edge between those packages. +package initerror + +import "errors" + +// Permanent marks an error as a permanent (non-retriable) initialization failure. +type Permanent interface { + error + Permanent() bool +} + +type permanentError struct { + err error +} + +// Error implements error. +func (p *permanentError) Error() string { return p.err.Error() } + +// Unwrap exposes the wrapped error so errors.Is/As can traverse the chain. +func (p *permanentError) Unwrap() error { return p.err } + +// Permanent reports that this error must not be retried. +func (p *permanentError) Permanent() bool { return true } + +// AsPermanent wraps err as a Permanent error. Returns nil if err is nil. +func AsPermanent(err error) error { + if err == nil { + return nil + } + return &permanentError{err: err} +} + +// IsPermanent reports whether err's chain contains a Permanent error. +func IsPermanent(err error) bool { + if err == nil { + return false + } + var p Permanent + return errors.As(err, &p) && p.Permanent() +} diff --git a/pkg/initerror/initerror_test.go b/pkg/initerror/initerror_test.go new file mode 100644 index 000000000..2c16db32f --- /dev/null +++ b/pkg/initerror/initerror_test.go @@ -0,0 +1,71 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package initerror_test + +import ( + "errors" + "fmt" + "testing" + + "github.com/apache/skywalking-banyandb/pkg/initerror" +) + +var errSentinel = errors.New("sentinel") + +func TestAsPermanent_NilReturnsNil(t *testing.T) { + if got := initerror.AsPermanent(nil); got != nil { + t.Fatalf("AsPermanent(nil) = %v, want nil", got) + } +} + +func TestIsPermanent_Nil(t *testing.T) { + if initerror.IsPermanent(nil) { + t.Fatalf("IsPermanent(nil) = true, want false") + } +} + +func TestIsPermanent_PlainError(t *testing.T) { + if initerror.IsPermanent(errSentinel) { + t.Fatalf("IsPermanent(plain error) = true, want false") + } +} + +func TestIsPermanent_Direct(t *testing.T) { + wrapped := initerror.AsPermanent(errSentinel) + if !initerror.IsPermanent(wrapped) { + t.Fatalf("IsPermanent(AsPermanent(...)) = false, want true") + } +} + +func TestIsPermanent_FmtErrorfChain(t *testing.T) { + wrapped := fmt.Errorf("init failed: %w", initerror.AsPermanent(errSentinel)) + if !initerror.IsPermanent(wrapped) { + t.Fatalf("IsPermanent through fmt.Errorf chain = false, want true") + } + if !errors.Is(wrapped, errSentinel) { + t.Fatalf("errors.Is should still see the inner sentinel") + } +} + +func TestIsPermanent_DoubleWrap(t *testing.T) { + inner := initerror.AsPermanent(errSentinel) + outer := fmt.Errorf("outer: %w", fmt.Errorf("middle: %w", inner)) + if !initerror.IsPermanent(outer) { + t.Fatalf("IsPermanent through double wrap = false, want true") + } +} diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go index 5d2c28400..95b8f88b1 100644 --- a/pkg/schema/cache.go +++ b/pkg/schema/cache.go @@ -35,11 +35,15 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/cgroups" + "github.com/apache/skywalking-banyandb/pkg/initerror" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" ) -var _ Resource = (*resourceSpec)(nil) +var ( + _ Resource = (*resourceSpec)(nil) + _ RevisionRepository = (*schemaRepo)(nil) +) type resourceSpec struct { schema ResourceSchema @@ -87,20 +91,127 @@ type schemaRepo struct { indexRuleMap sync.Map bindingForwardMap sync.Map bindingBackwardMap sync.Map + latestModRevision atomic.Int64 workerNum int resourceMutex sync.Mutex groupMux sync.Mutex } +// SendMetadataEvent applies the given event synchronously. When the downstream +// caches (e.g. pkg/schema.schemaRepo.groupMap and resourceMap) must be coherent +// before the caller returns — for example, when SchemaBarrierService delegates +// through notifyHandlers → this handler → SendMetadataEvent — the caller relies +// on the event being fully applied here. Only transient errors queue the event +// for retry via the background worker; the success path does not touch the +// channel. func (sr *schemaRepo) SendMetadataEvent(event MetadataEvent) { if !sr.closer.AddSender() { return } defer sr.closer.SenderDone() - select { - case sr.eventCh <- event: - case <-sr.closer.CloseNotify(): + // SendMetadataEvent runs synchronously on the caller goroutine (e.g. an + // etcd-watch handler such as (*schemaRepo).OnAddOrUpdate). A panic from + // processEvent — string-typed today, error-typed once Step 1b lands — + // would otherwise propagate to a caller that has no recover() of its own. + // The classifier branch escalates permanent errors to .Fatal() so + // runtime registration of an incompatible group exits the process the + // same way the boot path does; non-permanent panics are logged and + // requeued onto eventCh so the Watcher worker can retry, mirroring the + // transient error-return path below. + defer func() { + if recovered := recover(); recovered != nil { + if recErr, isErr := recovered.(error); isErr && initerror.IsPermanent(recErr) { + sr.l.Fatal().Err(recErr).Interface("event", event).Str("stack", string(debug.Stack())). + Msg("SendMetadataEvent hit a permanent error, refusing to continue") + } + sr.l.Warn().Interface("err", recovered).Interface("event", event).Str("stack", string(debug.Stack())). + Msg("recovered from SendMetadataEvent panic; requeueing") + sr.metrics.totalErrs.Inc(1) + select { + case sr.eventCh <- event: + case <-sr.closer.CloseNotify(): + } + } + }() + if err := sr.processEvent(event); err != nil && !errors.Is(err, schema.ErrClosed) { + if initerror.IsPermanent(err) { + sr.l.Fatal().Err(err).Interface("event", event). + Msg("SendMetadataEvent hit a permanent error, refusing to continue") + } + select { + case <-sr.closer.CloseNotify(): + return + default: + } + sr.l.Err(err).Interface("event", event).Msg("fail to handle the metadata event. retry...") + sr.metrics.totalErrs.Inc(1) + select { + case sr.eventCh <- event: + case <-sr.closer.CloseNotify(): + } + } +} + +// processEvent applies the event in-place, bumping latestModRevision on success +// so the barrier's downstream-handler watermark catches up before notifyHandlers +// returns to its caller. +func (sr *schemaRepo) processEvent(evt MetadataEvent) error { + if e := sr.l.Debug(); e.Enabled() { + e.Interface("event", evt).Msg("received an event") + } + var err error + switch evt.Typ { + case EventAddOrUpdate: + switch evt.Kind { + case EventKindGroup: + _, err = sr.storeGroup(evt.Metadata.GetMetadata()) + if errors.Is(err, schema.ErrGRPCResourceNotFound) { + err = nil + } + case EventKindResource: + err = sr.storeResource(evt.Metadata) + case EventKindIndexRule: + indexRule := evt.Metadata.(*databasev1.IndexRule) + sr.storeIndexRule(indexRule) + case EventKindIndexRuleBinding: + indexRuleBinding := evt.Metadata.(*databasev1.IndexRuleBinding) + sr.storeIndexRuleBinding(indexRuleBinding) + } + case EventDelete: + switch evt.Kind { + case EventKindGroup: + err = sr.deleteGroup(evt.Metadata.GetMetadata()) + case EventKindResource: + sr.deleteResource(evt) + case EventKindIndexRule: + key := getKey(evt.Metadata.GetMetadata()) + sr.indexRuleMap.Delete(key) + case EventKindIndexRuleBinding: + indexRuleBinding := evt.Metadata.(*databasev1.IndexRuleBinding) + col, _ := sr.bindingForwardMap.Load(getKey(&commonv1.Metadata{ + Name: indexRuleBinding.Subject.GetName(), + Group: indexRuleBinding.GetMetadata().GetGroup(), + })) + if col == nil { + break + } + tMap := col.(*sync.Map) + key := getKey(indexRuleBinding.GetMetadata()) + tMap.Delete(key) + for i := range indexRuleBinding.Rules { + col, _ := sr.bindingBackwardMap.Load(getKey(&commonv1.Metadata{ + Name: indexRuleBinding.Rules[i], + Group: indexRuleBinding.GetMetadata().GetGroup(), + })) + if col == nil { + continue + } + tMap := col.(*sync.Map) + tMap.Delete(key) + } + } } + return err } // StopCh implements Repository. @@ -155,89 +266,49 @@ func (sr *schemaRepo) Watcher() { } defer func() { sr.closer.ReceiverDone() + // Bump the panic metric BEFORE the classifier branch: .Fatal() + // calls os.Exit(1) and the deferred Inc would otherwise be + // skipped on the permanent-error escalation path. Do NOT move + // this line under the if-branch. + sr.metrics.totalPanics.Inc(1) if err := recover(); err != nil { + // Classifier is dormant on string-typed panics (today's + // Panicf sites outside this PR's scope still panic with + // strings); tsdb.go:176/242 are converted in Step 1b so + // permanent errors reaching this recover are error-typed. + // We use .Fatal() not panic() because this is a worker + // goroutine: panic() would only kill this goroutine and + // leave the Watcher partially alive. + if recErr, isErr := err.(error); isErr && initerror.IsPermanent(recErr) { + sr.l.Fatal().Err(recErr).Str("stack", string(debug.Stack())). + Msg("Watcher hit a permanent error, refusing to continue") + } sr.l.Warn().Interface("err", err).Str("stack", string(debug.Stack())).Msg("watching the events") } - sr.metrics.totalPanics.Inc(1) }() + // The Watcher drains events retried via the channel after transient + // processing errors. The fast path runs synchronously in SendMetadataEvent + // so the barrier's downstream-handler watermark advances correctly. for { select { case evt, more := <-sr.eventCh: if !more { return } - if e := sr.l.Debug(); e.Enabled() { - e.Interface("event", evt).Msg("received an event") - } - var err error - switch evt.Typ { - case EventAddOrUpdate: - switch evt.Kind { - case EventKindGroup: - _, err = sr.storeGroup(evt.Metadata.GetMetadata()) - if errors.As(err, schema.ErrGRPCResourceNotFound) { - err = nil - } - case EventKindResource: - err = sr.storeResource(evt.Metadata) - case EventKindIndexRule: - indexRule := evt.Metadata.(*databasev1.IndexRule) - if indexRule.GetMetadata().GetGroup() == "test-trace-group" { - sr.l.Info().Str("group", indexRule.GetMetadata().GetGroup()).Msg("index rule") - } - sr.storeIndexRule(indexRule) - case EventKindIndexRuleBinding: - indexRuleBinding := evt.Metadata.(*databasev1.IndexRuleBinding) - if indexRuleBinding.GetMetadata().GetGroup() == "test-trace-group" { - sr.l.Info().Str("group", indexRuleBinding.GetMetadata().GetGroup()).Msg("index rule binding") - } - sr.storeIndexRuleBinding(indexRuleBinding) - } - case EventDelete: - switch evt.Kind { - case EventKindGroup: - err = sr.deleteGroup(evt.Metadata.GetMetadata()) - case EventKindResource: - sr.deleteResource(evt.Metadata.GetMetadata()) - case EventKindIndexRule: - key := getKey(evt.Metadata.GetMetadata()) - sr.indexRuleMap.Delete(key) - case EventKindIndexRuleBinding: - indexRuleBinding := evt.Metadata.(*databasev1.IndexRuleBinding) - col, _ := sr.bindingForwardMap.Load(getKey(&commonv1.Metadata{ - Name: indexRuleBinding.Subject.GetName(), - Group: indexRuleBinding.GetMetadata().GetGroup(), - })) - if col == nil { - break - } - tMap := col.(*sync.Map) - key := getKey(indexRuleBinding.GetMetadata()) - tMap.Delete(key) - for i := range indexRuleBinding.Rules { - col, _ := sr.bindingBackwardMap.Load(getKey(&commonv1.Metadata{ - Name: indexRuleBinding.Rules[i], - Group: indexRuleBinding.GetMetadata().GetGroup(), - })) - if col == nil { - continue - } - tMap := col.(*sync.Map) - tMap.Delete(key) - } + if retryErr := sr.processEvent(evt); retryErr != nil && !errors.Is(retryErr, schema.ErrClosed) { + if initerror.IsPermanent(retryErr) { + sr.l.Fatal().Err(retryErr).Interface("event", evt). + Msg("Watcher hit a permanent error, refusing to continue") } - } - if err != nil && !errors.Is(err, schema.ErrClosed) { select { case <-sr.closer.CloseNotify(): return default: } - sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. retry...") - sr.metrics.totalErrs.Inc(1) + sr.l.Err(retryErr).Interface("event", evt).Msg("retry processing failed, requeueing") + sr.metrics.totalRetries.Inc(1) go func() { sr.SendMetadataEvent(evt) - sr.metrics.totalRetries.Inc(1) }() } case <-sr.closer.CloseNotify(): @@ -259,12 +330,18 @@ func (sr *schemaRepo) storeGroup(groupMeta *commonv1.Metadata) (*group, error) { if err := g.init(name); err != nil { return nil, err } + if gs := g.GetSchema(); gs != nil { + sr.updateLatestModRevision(gs.GetMetadata().GetModRevision()) + } return g, nil } if !g.isInit() { if err := g.init(name); err != nil { return nil, err } + if gs := g.GetSchema(); gs != nil { + sr.updateLatestModRevision(gs.GetMetadata().GetModRevision()) + } return g, nil } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -278,6 +355,7 @@ func (sr *schemaRepo) storeGroup(groupMeta *commonv1.Metadata) (*group, error) { return g, nil } g.groupSchema.Store(groupSchema) + sr.updateLatestModRevision(groupSchema.GetMetadata().GetModRevision()) if proto.Equal(groupSchema, prevGroupSchema) { return g, nil } @@ -365,6 +443,7 @@ func (sr *schemaRepo) storeResource(resourceSchema ResourceSchema) error { sm.OnIndexUpdate(sr.indexRules(resourceSchema)) resource.delegated = sm sr.resourceMap.Store(key, resource) + sr.updateLatestModRevision(resourceSchema.GetMetadata().GetModRevision()) return nil } @@ -373,6 +452,7 @@ func (sr *schemaRepo) storeIndexRule(indexRule *databasev1.IndexRule) { if prev, loaded := sr.indexRuleMap.LoadOrStore(key, indexRule); loaded { if prev.(*databasev1.IndexRule).GetMetadata().ModRevision <= indexRule.GetMetadata().ModRevision { sr.indexRuleMap.Store(key, indexRule) + sr.updateLatestModRevision(indexRule.GetMetadata().GetModRevision()) if col, _ := sr.bindingBackwardMap.Load(key); col != nil { col.(*sync.Map).Range(func(_, value any) bool { sr.updateIndex(value.(*databasev1.IndexRuleBinding)) @@ -381,6 +461,7 @@ func (sr *schemaRepo) storeIndexRule(indexRule *databasev1.IndexRule) { } } } else { + sr.updateLatestModRevision(indexRule.GetMetadata().GetModRevision()) if col, _ := sr.bindingBackwardMap.Load(key); col != nil { col.(*sync.Map).Range(func(_, value any) bool { sr.updateIndex(value.(*databasev1.IndexRuleBinding)) @@ -425,6 +506,7 @@ func (sr *schemaRepo) storeIndexRuleBinding(indexRuleBinding *databasev1.IndexRu if !changed { return } + sr.updateLatestModRevision(indexRuleBinding.GetMetadata().GetModRevision()) sr.updateIndex(indexRuleBinding) } @@ -468,11 +550,74 @@ func getKey(metadata *commonv1.Metadata) string { return path.Join(metadata.GetGroup(), metadata.GetName()) } -func (sr *schemaRepo) deleteResource(metadata *commonv1.Metadata) { - key := getKey(metadata) +func (sr *schemaRepo) deleteResource(evt MetadataEvent) { + key := getKey(evt.Metadata.GetMetadata()) + // Hold resourceMutex across the load/check/delete to keep the revision-guard atomic. + // storeResource takes the same lock, so a concurrent newer store cannot interleave + // between the staleness check and LoadAndDelete and have its entry wiped. + sr.resourceMutex.Lock() + defer sr.resourceMutex.Unlock() + if evt.DeleteRevision != 0 { + if v, ok := sr.resourceMap.Load(key); ok { + stored := v.(*resourceSpec) + if evt.DeleteRevision < stored.maxRevision() { + return + } + } + } _, _ = sr.resourceMap.LoadAndDelete(key) } +func (sr *schemaRepo) updateLatestModRevision(incoming int64) { + for { + cur := sr.latestModRevision.Load() + if incoming <= cur { + return + } + if sr.latestModRevision.CompareAndSwap(cur, incoming) { + return + } + } +} + +// LatestModRevision returns the highest mod_revision seen across all stored kinds. +func (sr *schemaRepo) LatestModRevision() int64 { + return sr.latestModRevision.Load() +} + +// ResourceRevision returns the mod_revision for a stored resource and whether it was found. +func (sr *schemaRepo) ResourceRevision(kind schema.Kind, groupName, name string) (int64, bool) { + key := path.Join(groupName, name) + switch kind { + case schema.KindStream, schema.KindMeasure, schema.KindTrace: + if v, ok := sr.resourceMap.Load(key); ok { + return v.(*resourceSpec).maxRevision(), true + } + case schema.KindIndexRule: + if v, ok := sr.indexRuleMap.Load(key); ok { + return v.(*databasev1.IndexRule).GetMetadata().GetModRevision(), true + } + case schema.KindGroup: + if v, ok := sr.groupMap.Load(name); ok { + grp := v.(*group) + if gs := grp.GetSchema(); gs != nil { + return gs.GetMetadata().GetModRevision(), true + } + } + case schema.KindIndexRuleBinding, schema.KindTopNAggregation, + schema.KindNode, schema.KindProperty, schema.KindMask: + // schemaRepo only caches resources, index rules, and groups; other kinds + // (bindings, top-n aggregations, nodes, properties, masks) have no entry here. + } + return 0, false +} + +// IsAbsent returns true when the given resource is not present in the local cache. +func (sr *schemaRepo) IsAbsent(kind schema.Kind, groupName, name string) bool { + _, ok := sr.ResourceRevision(kind, groupName, name) + return !ok +} + func (sr *schemaRepo) Close() { defer func() { if err := recover(); err != nil { @@ -539,8 +684,8 @@ func (g *group) init(name string) error { } func (g *group) initBySchema(groupSchema *commonv1.Group) error { - g.groupSchema.Store(groupSchema) if g.isPortable() { + g.groupSchema.Store(groupSchema) return nil } db, err := g.resourceSupplier.OpenDB(groupSchema) @@ -548,7 +693,8 @@ func (g *group) initBySchema(groupSchema *commonv1.Group) error { return err } g.db.Store(db) - return err + g.groupSchema.Store(groupSchema) + return nil } func (g *group) isInit() bool { @@ -574,7 +720,11 @@ func (g *group) close() (err error) { if !g.isInit() || g.isPortable() { return nil } - return multierr.Append(err, g.SupplyTSDB().Close()) + tsdb := g.SupplyTSDB() + if tsdb == nil { + return nil + } + return multierr.Append(err, tsdb.Close()) } func (g *group) drop() error { diff --git a/pkg/schema/cache_test.go b/pkg/schema/cache_test.go new file mode 100644 index 000000000..ebafce414 --- /dev/null +++ b/pkg/schema/cache_test.go @@ -0,0 +1,297 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package schema + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + pkglogger "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// testLogger returns a logger suitable for unit tests in this package. +func testLogger() *pkglogger.Logger { + _ = pkglogger.Init(pkglogger.Logging{Env: "dev", Level: "warn"}) + return pkglogger.GetLogger("test", "schema-cache") +} + +// noopIndexListener satisfies IndexListener and discards all index updates. +type noopIndexListener struct{} + +func (noopIndexListener) OnIndexUpdate(_ []*databasev1.IndexRule) {} + +// stubSupplier satisfies ResourceSchemaSupplier for tests that only need +// storeResource to succeed without real DB or metadata interactions. +type stubSupplier struct{} + +func (stubSupplier) ResourceSchema(_ *commonv1.Metadata) (ResourceSchema, error) { + return nil, nil +} + +func (stubSupplier) OpenResource(_ Resource) (IndexListener, error) { + return noopIndexListener{}, nil +} + +// newTestSchemaRepo creates a minimal schemaRepo suitable for unit tests. +// The zero values of all sync.Map and atomic fields are ready to use. +func newTestSchemaRepo() *schemaRepo { + return &schemaRepo{resourceSchemaSupplier: stubSupplier{}} +} + +// buildMeasure returns a *databasev1.Measure with the specified name and mod_revision. +// Group is fixed to "g" because every test in this file uses the same group; varying +// it would only add noise without changing what's being asserted. +func buildMeasure(name string, modRevision int64) *databasev1.Measure { + return &databasev1.Measure{ + Metadata: &commonv1.Metadata{Group: "g", Name: name, ModRevision: modRevision}, + } +} + +// TestDeleteResource_RevisionGuard_RejectsStaleDelete verifies that an EventDelete +// carrying a DeleteRevision lower than the stored resource's mod_revision is ignored, +// leaving the cache entry intact (A7). +func TestDeleteResource_RevisionGuard_RejectsStaleDelete(t *testing.T) { + sr := newTestSchemaRepo() + require.NoError(t, sr.storeResource(buildMeasure("m", 100))) + + sr.deleteResource(MetadataEvent{ + Metadata: buildMeasure("m", 100), + DeleteRevision: 50, + Typ: EventDelete, + Kind: EventKindResource, + }) + + _, present := sr.resourceMap.Load("g/m") + assert.True(t, present, "stale delete (DeleteRevision=50 < stored rev=100) must not remove the entry") +} + +// TestDeleteResource_RevisionGuard_AcceptsFreshDelete verifies that an EventDelete +// carrying a DeleteRevision greater than the stored resource's mod_revision removes +// the cache entry (A7). +func TestDeleteResource_RevisionGuard_AcceptsFreshDelete(t *testing.T) { + sr := newTestSchemaRepo() + require.NoError(t, sr.storeResource(buildMeasure("m", 100))) + + sr.deleteResource(MetadataEvent{ + Metadata: buildMeasure("m", 100), + DeleteRevision: 150, + Typ: EventDelete, + Kind: EventKindResource, + }) + + _, present := sr.resourceMap.Load("g/m") + assert.False(t, present, "fresh delete (DeleteRevision=150 >= stored rev=100) must remove the entry") +} + +// TestLatestModRevision_Monotonic verifies that LatestModRevision advances monotonically +// with ascending stores and does not regress when a lower revision is applied (A8). +func TestLatestModRevision_Monotonic(t *testing.T) { + sr := newTestSchemaRepo() + assert.Equal(t, int64(0), sr.LatestModRevision(), "watermark must start at zero") + + require.NoError(t, sr.storeResource(buildMeasure("m1", 10))) + assert.Equal(t, int64(10), sr.LatestModRevision()) + + require.NoError(t, sr.storeResource(buildMeasure("m2", 30))) + assert.Equal(t, int64(30), sr.LatestModRevision()) + + // A new key at a lower revision must not regress the watermark. + require.NoError(t, sr.storeResource(buildMeasure("m3", 20))) + assert.Equal(t, int64(30), sr.LatestModRevision(), "stale store (rev=20) must not lower the watermark from 30") +} + +// TestLatestModRevision_AcrossKinds verifies that LatestModRevision tracks the global +// maximum across Resource, IndexRule, and IndexRuleBinding stores (A8). +func TestLatestModRevision_AcrossKinds(t *testing.T) { + sr := newTestSchemaRepo() + + require.NoError(t, sr.storeResource(buildMeasure("m1", 10))) + assert.Equal(t, int64(10), sr.LatestModRevision()) + + sr.storeIndexRule(&databasev1.IndexRule{ + Metadata: &commonv1.Metadata{Group: "g", Name: "idx1", ModRevision: 50}, + Tags: []string{"tag1"}, + Type: databasev1.IndexRule_TYPE_INVERTED, + }) + assert.Equal(t, int64(50), sr.LatestModRevision(), "IndexRule store (rev=50) must advance the watermark") + + sr.storeIndexRuleBinding(&databasev1.IndexRuleBinding{ + Metadata: &commonv1.Metadata{Group: "g", Name: "irb1", ModRevision: 40}, + Rules: []string{"idx1"}, + Subject: &databasev1.Subject{Catalog: commonv1.Catalog_CATALOG_MEASURE, Name: "m1"}, + }) + assert.Equal(t, int64(50), sr.LatestModRevision(), "IndexRuleBinding at rev=40 must not lower the watermark from 50") +} + +// TestResourceRevision_PresentAndAbsent verifies that ResourceRevision returns the +// stored mod_revision and ok=true for a present resource, and (0, false) for an absent one (A8). +func TestResourceRevision_PresentAndAbsent(t *testing.T) { + sr := newTestSchemaRepo() + require.NoError(t, sr.storeResource(buildMeasure("m", 77))) + + rev, ok := sr.ResourceRevision(schema.KindMeasure, "g", "m") + assert.True(t, ok) + assert.Equal(t, int64(77), rev) + + rev, ok = sr.ResourceRevision(schema.KindMeasure, "g", "missing") + assert.False(t, ok) + assert.Equal(t, int64(0), rev) +} + +// TestIsAbsent_AfterDelete verifies that IsAbsent returns false while the resource +// is cached and true after a valid delete removes it from the cache (A7, A8). +func TestIsAbsent_AfterDelete(t *testing.T) { + sr := newTestSchemaRepo() + require.NoError(t, sr.storeResource(buildMeasure("m", 100))) + + assert.False(t, sr.IsAbsent(schema.KindMeasure, "g", "m"), "resource must not be absent immediately after store") + + sr.deleteResource(MetadataEvent{ + Metadata: buildMeasure("m", 100), + DeleteRevision: 100, + Typ: EventDelete, + Kind: EventKindResource, + }) + + assert.True(t, sr.IsAbsent(schema.KindMeasure, "g", "m"), "resource must be absent after a valid delete") +} + +// fakeDB satisfies pkg/schema.DB for unit tests that only exercise group caching. +type fakeDB struct{} + +func (fakeDB) Close() error { return nil } +func (fakeDB) UpdateOptions(_ *commonv1.ResourceOpts) {} +func (fakeDB) Drop() error { return nil } + +// fakeResourceSupplier is a programmable ResourceSupplier whose OpenDB returns +// a configurable error or DB. openCalls counts invocations so retry tests can +// assert that initBySchema actually re-attempted OpenDB. +type fakeResourceSupplier struct { + openErr error + openCalls int +} + +func (s *fakeResourceSupplier) ResourceSchema(_ *commonv1.Metadata) (ResourceSchema, error) { + return nil, nil +} + +func (s *fakeResourceSupplier) OpenResource(_ Resource) (IndexListener, error) { + return noopIndexListener{}, nil +} + +func (s *fakeResourceSupplier) OpenDB(_ *commonv1.Group) (DB, error) { + s.openCalls++ + if s.openErr != nil { + return nil, s.openErr + } + return fakeDB{}, nil +} + +func buildGroup(name string) *commonv1.Group { + return &commonv1.Group{ + Metadata: &commonv1.Metadata{Name: name}, + } +} + +// TestInitBySchema_FailedOpenDB_LeavesGroupUninit asserts that when OpenDB +// fails, the group is left in an un-initialized state so a later retry can +// reattempt OpenDB. Previously the schema was stored before OpenDB ran, which +// caused isInit() to return true even though db was nil. +func TestInitBySchema_FailedOpenDB_LeavesGroupUninit(t *testing.T) { + openErr := errors.New("boom") + supplier := &fakeResourceSupplier{openErr: openErr} + g := newGroup(nil, nil, supplier) + + err := g.initBySchema(buildGroup("g1")) + require.Error(t, err) + assert.True(t, errors.Is(err, openErr)) + assert.False(t, g.isInit(), "group must not be initialized after a failed OpenDB") + assert.Nil(t, g.GetSchema(), "groupSchema must not be stored after a failed OpenDB") + assert.Nil(t, g.SupplyTSDB(), "SupplyTSDB must be nil after a failed OpenDB") +} + +// TestInitBySchema_PortableGroupOK asserts that a portable group (nil +// resourceSupplier) stores the schema and is considered initialized without +// invoking OpenDB. +func TestInitBySchema_PortableGroupOK(t *testing.T) { + g := newGroup(nil, nil, nil) + require.NoError(t, g.initBySchema(buildGroup("portable"))) + assert.True(t, g.isInit(), "portable group must be initialized after initBySchema") + assert.Nil(t, g.SupplyTSDB(), "portable group must have no underlying tsdb") +} + +// TestInitBySchema_SuccessStoresSchema asserts the success path stores schema +// after OpenDB returns. +func TestInitBySchema_SuccessStoresSchema(t *testing.T) { + supplier := &fakeResourceSupplier{} + g := newGroup(nil, nil, supplier) + require.NoError(t, g.initBySchema(buildGroup("g1"))) + assert.True(t, g.isInit()) + assert.Equal(t, 1, supplier.openCalls) +} + +// TestInitGroup_RetryAfterFailedOpenDB asserts that initGroup retries OpenDB on +// the second pass when the first attempt left the group un-initialized — the +// fix for the silent-fail bug where a half-broken group survived in the cache. +func TestInitGroup_RetryAfterFailedOpenDB(t *testing.T) { + openErr := errors.New("transient") + supplier := &fakeResourceSupplier{openErr: openErr} + repo := &schemaRepo{ + l: testLogger(), + resourceSupplier: supplier, + } + + gs := buildGroup("g1") + _, firstErr := repo.initGroup(gs) + require.Error(t, firstErr) + require.Equal(t, 1, supplier.openCalls) + + cached, ok := repo.getGroup("g1") + require.True(t, ok) + require.False(t, cached.isInit()) + + supplier.openErr = nil + g, secondErr := repo.initGroup(gs) + require.NoError(t, secondErr) + require.NotNil(t, g) + assert.True(t, g.isInit()) + assert.Equal(t, 2, supplier.openCalls, "OpenDB must be re-attempted when the cached group is not initialized") +} + +// TestInitGroup_AlreadyInit_NoReopen asserts that an already-initialized group +// is returned without reinvoking OpenDB. +func TestInitGroup_AlreadyInit_NoReopen(t *testing.T) { + supplier := &fakeResourceSupplier{} + repo := &schemaRepo{ + l: testLogger(), + resourceSupplier: supplier, + } + gs := buildGroup("g1") + _, err := repo.initGroup(gs) + require.NoError(t, err) + require.Equal(t, 1, supplier.openCalls) + _, err = repo.initGroup(gs) + require.NoError(t, err) + assert.Equal(t, 1, supplier.openCalls, "already-initialized group must not re-invoke OpenDB") +} diff --git a/pkg/schema/cache_watcher_fatal_test.go b/pkg/schema/cache_watcher_fatal_test.go new file mode 100644 index 000000000..3c66c5d28 --- /dev/null +++ b/pkg/schema/cache_watcher_fatal_test.go @@ -0,0 +1,256 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package schema + +import ( + stderrors "errors" + "fmt" + "os" + "os/exec" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/pkg/initerror" + pkglogger "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" + "github.com/apache/skywalking-banyandb/pkg/run" +) + +// nopCounter is a meter.Counter that discards all increments. Used by the +// fatal-fail subprocess tests so the worker's deferred Inc(1) does not +// nil-panic in the child process. +type nopCounter struct{} + +func (nopCounter) Inc(_ float64, _ ...string) {} +func (nopCounter) Delete(_ ...string) bool { return false } + +var _ meter.Counter = nopCounter{} + +const ( + // fatalChildEnv signals to the test binary that it is the in-process + // child for a subprocess fast-fail assertion. Parents re-exec the + // running test binary with this env set; only the matching child + // branch executes — the parent does not run the child code path. + fatalChildEnv = "BANYAND_TEST_F5_FATAL_CHILD" + fatalChildModeWatcher = "watcher" + fatalChildModeSendSync = "send-sync" +) + +// permanentPanicSupplier is a ResourceSchemaSupplier whose OpenResource panics +// with an error-typed permanent error. It exercises the Watcher worker's +// recover() classifier (Step 1) — the panic propagates up through processEvent +// and is caught in the worker's deferred recover(), where IsPermanent classifies +// it as fatal. +type permanentPanicSupplier struct{} + +func (permanentPanicSupplier) ResourceSchema(_ *commonv1.Metadata) (ResourceSchema, error) { + return nil, nil +} + +func (permanentPanicSupplier) OpenResource(_ Resource) (IndexListener, error) { + panic(initerror.AsPermanent(stderrors.New("incompatible version 1.3.0"))) +} + +// permanentErrorSupplier is a ResourceSchemaSupplier whose OpenResource returns +// a permanent error. It exercises the synchronous-return classifier in +// SendMetadataEvent (Step 2b). +type permanentErrorSupplier struct{} + +func (permanentErrorSupplier) ResourceSchema(_ *commonv1.Metadata) (ResourceSchema, error) { + return nil, nil +} + +func (permanentErrorSupplier) OpenResource(_ Resource) (IndexListener, error) { + return nil, initerror.AsPermanent(stderrors.New("incompatible version 1.3.0")) +} + +// transientThenOKSupplier returns a non-permanent error on the first call and +// nil on the second. It is used to verify the transient-retry path stays +// functional after F5 (Test 5.6). +type transientThenOKSupplier struct { + openCalls atomic.Int32 +} + +func (s *transientThenOKSupplier) ResourceSchema(_ *commonv1.Metadata) (ResourceSchema, error) { + return nil, nil +} + +func (s *transientThenOKSupplier) OpenResource(_ Resource) (IndexListener, error) { + calls := s.openCalls.Add(1) + if calls == 1 { + return nil, stderrors.New("etcd hiccup") + } + return noopIndexListener{}, nil +} + +// fatalChildLogger returns a logger whose .Fatal() output goes to stderr so +// the parent test can assert on the exit message via CombinedOutput. +func fatalChildLogger() *pkglogger.Logger { + _ = pkglogger.Init(pkglogger.Logging{Env: "dev", Level: "info"}) + return pkglogger.GetLogger("test", "schema-cache-fatal") +} + +// fatalChildMetrics returns a Metrics instance backed by no-op counters so +// the worker's deferred Inc(1) call does not nil-panic in the child process. +func fatalChildMetrics() *Metrics { + return &Metrics{ + totalErrs: nopCounter{}, + totalRetries: nopCounter{}, + totalPanics: nopCounter{}, + } +} + +func runFatalChild(t *testing.T, mode string) string { + t.Helper() + // #nosec G204 — os.Args[0] is the test binary path, set by the Go test + // runner. The mode argument is one of two compile-time string constants + // declared above. + cmd := exec.Command(os.Args[0], "-test.run=^"+t.Name()+"$", "-test.v") + cmd.Env = append(os.Environ(), fatalChildEnv+"="+mode) + out, err := cmd.CombinedOutput() + require.Error(t, err, "child must exit non-zero; output:\n%s", string(out)) + var exitErr *exec.ExitError + require.ErrorAs(t, err, &exitErr, "child must produce *exec.ExitError; output:\n%s", string(out)) + require.NotZero(t, exitErr.ExitCode(), "child must exit with non-zero code; output:\n%s", string(out)) + return string(out) +} + +// TestWatcherHitPermanentError_FailsFast verifies that the Watcher worker's +// recover() classifier escalates an error-typed permanent panic to .Fatal(), +// which exits the process non-zero (Step 1). +func TestWatcherHitPermanentError_FailsFast(t *testing.T) { + if os.Getenv(fatalChildEnv) == fatalChildModeWatcher { + runWatcherFatalChild() + return + } + out := runFatalChild(t, fatalChildModeWatcher) + if !strings.Contains(out, "Watcher hit a permanent error") { + t.Fatalf("expected stderr to mention 'Watcher hit a permanent error', got:\n%s", out) + } +} + +// runWatcherFatalChild constructs a minimal schemaRepo whose OpenResource +// panics with a permanent error, starts the Watcher, and pushes an event +// through eventCh. The Watcher worker's recover() classifier should escalate +// to .Fatal() within a few hundred milliseconds. +func runWatcherFatalChild() { + repo := &schemaRepo{ + l: fatalChildLogger(), + resourceSchemaSupplier: permanentPanicSupplier{}, + eventCh: make(chan MetadataEvent, 1), + workerNum: 1, + closer: run.NewChannelCloser(), + metrics: fatalChildMetrics(), + } + repo.Watcher() + repo.eventCh <- MetadataEvent{ + Typ: EventAddOrUpdate, + Kind: EventKindResource, + Metadata: &databasev1.Measure{ + Metadata: &commonv1.Metadata{Group: "g", Name: "m", ModRevision: 1}, + }, + } + // Hold the goroutine alive long enough for the worker's defer to run. + // .Fatal() calls os.Exit(1), so this sleep should never complete. + time.Sleep(5 * time.Second) + fmt.Fprintln(os.Stderr, "child did not exit; classifier did not fire") + os.Exit(2) +} + +// TestSendMetadataEventHitPermanentError_FailsFast verifies that the +// synchronous SendMetadataEvent path escalates a permanent error returned +// from processEvent to .Fatal() (Step 2b). +func TestSendMetadataEventHitPermanentError_FailsFast(t *testing.T) { + if os.Getenv(fatalChildEnv) == fatalChildModeSendSync { + runSendSyncFatalChild() + return + } + out := runFatalChild(t, fatalChildModeSendSync) + if !strings.Contains(out, "SendMetadataEvent hit a permanent error") { + t.Fatalf("expected stderr to mention 'SendMetadataEvent hit a permanent error', got:\n%s", out) + } +} + +// runSendSyncFatalChild calls SendMetadataEvent on a schemaRepo whose +// OpenResource returns a permanent error. The synchronous classifier branch +// at cache.go (Step 2b) should escalate to .Fatal() before SendMetadataEvent +// returns. +func runSendSyncFatalChild() { + repo := &schemaRepo{ + l: fatalChildLogger(), + resourceSchemaSupplier: permanentErrorSupplier{}, + eventCh: make(chan MetadataEvent, 1), + workerNum: 1, + closer: run.NewChannelCloser(), + metrics: fatalChildMetrics(), + } + repo.SendMetadataEvent(MetadataEvent{ + Typ: EventAddOrUpdate, + Kind: EventKindResource, + Metadata: &databasev1.Measure{ + Metadata: &commonv1.Metadata{Group: "g", Name: "m", ModRevision: 1}, + }, + }) + fmt.Fprintln(os.Stderr, "child did not exit; classifier did not fire") + os.Exit(2) +} + +// TestSendMetadataEvent_TransientErrorRequeues verifies that a non-permanent +// error from processEvent is requeued via eventCh (the existing transient +// path) and not escalated to .Fatal() (Test 5.6 — preserves transient retry). +func TestSendMetadataEvent_TransientErrorRequeues(t *testing.T) { + supplier := &transientThenOKSupplier{} + repo := &schemaRepo{ + l: testLogger(), + resourceSchemaSupplier: supplier, + eventCh: make(chan MetadataEvent, 1), + workerNum: 1, + closer: run.NewChannelCloser(), + metrics: fatalChildMetrics(), + } + + evt := MetadataEvent{ + Typ: EventAddOrUpdate, + Kind: EventKindResource, + Metadata: &databasev1.Measure{ + Metadata: &commonv1.Metadata{Group: "g", Name: "m", ModRevision: 1}, + }, + } + repo.SendMetadataEvent(evt) + + // First processEvent failed with a transient error; SendMetadataEvent should + // have pushed onto eventCh for the worker (or self) to retry. + select { + case requeued := <-repo.eventCh: + require.Equal(t, EventKindResource, requeued.Kind, "requeued event must be the original") + case <-time.After(2 * time.Second): + t.Fatalf("transient error must requeue the event onto eventCh; openCalls=%d", supplier.openCalls.Load()) + } + require.EqualValues(t, 1, supplier.openCalls.Load(), "first SendMetadataEvent must call OpenResource exactly once") + + // Second pass: drive the requeued event through processEvent directly to + // confirm OpenResource is reattempted and succeeds. + require.NoError(t, repo.processEvent(evt)) + require.EqualValues(t, 2, supplier.openCalls.Load(), "second pass must reattempt OpenResource") +} diff --git a/pkg/schema/init.go b/pkg/schema/init.go index 3f96d750b..e7b5a09fa 100644 --- a/pkg/schema/init.go +++ b/pkg/schema/init.go @@ -94,9 +94,9 @@ func (sr *schemaRepo) getCatalog(kind schema.Kind) commonv1.Catalog { } func (sr *schemaRepo) processGroup(ctx context.Context, g *commonv1.Group, catalog commonv1.Catalog) { - _, err := sr.initGroup(g) - if err != nil { - logger.Panicf("fails to init the group: %v", err) + if _, initErr := sr.initGroup(g); initErr != nil { + sr.l.Error().Err(initErr).Str("group", g.Metadata.GetName()).Msg("fails to init the group") + panic(fmt.Errorf("fails to init the group %s: %w", g.Metadata.GetName(), initErr)) } sr.processRules(ctx, g.Metadata.GetName()) sr.processBindings(ctx, g.Metadata.GetName()) @@ -216,6 +216,12 @@ func (sr *schemaRepo) initGroup(groupSchema *commonv1.Group) (*group, error) { defer sr.groupMux.Unlock() g, ok := sr.getGroup(groupSchema.Metadata.Name) if ok { + if g.isInit() { + return g, nil + } + if reinitErr := g.initBySchema(groupSchema); reinitErr != nil { + return nil, reinitErr + } return g, nil } sr.l.Info().Str("group", groupSchema.Metadata.Name).Msg("creating a tsdb")
