This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch metadata-group in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5896e8c72abb3414f09a557571636d779fa8116f Author: Gao Hongtao <[email protected]> AuthorDate: Wed Dec 18 14:50:53 2024 +0800 Fix schema updating bugs Signed-off-by: Gao Hongtao <[email protected]> --- CHANGES.md | 1 + banyand/internal/storage/rotation.go | 5 +- banyand/internal/storage/segment.go | 56 +++++++--- banyand/internal/storage/shard.go | 2 +- banyand/internal/storage/storage.go | 1 + banyand/internal/storage/tsdb.go | 24 ++++- banyand/internal/storage/tsdb_test.go | 164 +++++++++++++++++++++++++++++ banyand/measure/metadata.go | 2 +- banyand/measure/metrics.go | 3 + banyand/metadata/schema/group.go | 13 ++- banyand/metadata/schema/measure.go | 44 +++++++- banyand/metadata/schema/stream.go | 35 ++++-- banyand/stream/metadata.go | 2 +- banyand/stream/metrics.go | 4 + docs/interacting/bydbctl/schema/group.md | 2 + docs/interacting/bydbctl/schema/measure.md | 28 +++-- docs/interacting/bydbctl/schema/stream.md | 6 +- pkg/schema/cache.go | 27 ++--- pkg/schema/schema.go | 8 +- 19 files changed, 363 insertions(+), 64 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a5cd0734..8e5ec472 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -27,6 +27,7 @@ Release Notes. - Metadata: Fix the bug that the cache load nil value that is the unknown index rule on the index rule binding. - Queue: Fix the bug that the client remove a registered node in the eviction list. The node is controlled by the recovery loop, doesn't need to be removed in the failover process. - UI: Add prettier to enforce a consistent style by parsing code. +- Fix the bug that fails to update `Group` Schema's ResourceOpts. ### Documentation diff --git a/banyand/internal/storage/rotation.go b/banyand/internal/storage/rotation.go index ab18daec..2066ef90 100644 --- a/banyand/internal/storage/rotation.go +++ b/banyand/internal/storage/rotation.go @@ -43,7 +43,8 @@ func (d *database[T, O]) Tick(ts int64) { } func (d *database[T, O]) startRotationTask() error { - rt := newRetentionTask(d, d.opts.TTL) + options := d.segmentController.getOptions() + rt := newRetentionTask(d, options.TTL) go func(rt *retentionTask[T, O]) { for ts := range d.tsEventCh { func(ts int64) { @@ -70,7 +71,7 @@ func (d *database[T, O]) startRotationTask() error { } d.incTotalRotationStarted(1) defer d.incTotalRotationFinished(1) - start := d.segmentController.opts.SegmentInterval.nextTime(t) + start := options.SegmentInterval.nextTime(t) d.logger.Info().Time("segment_start", start).Time("event_time", t).Msg("create new segment") _, err := d.segmentController.create(start) if err != nil { diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 02f59581..0451c649 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -32,6 +32,7 @@ import ( "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -43,6 +44,8 @@ var ErrExpiredData = errors.New("expired data") type segment[T TSTable, O any] struct { metrics any + option O + creator TSTableCreator[T, O] l *logger.Logger index *seriesIndex sLst atomic.Pointer[[]*shard[T]] @@ -50,7 +53,6 @@ type segment[T TSTable, O any] struct { timestamp.TimeRange suffix string location string - opts TSDBOpts[T, O] mu sync.Mutex refCount int32 mustBeDeleted uint32 @@ -58,7 +60,6 @@ type segment[T TSTable, O any] struct { } func (sc *segmentController[T, O]) openSegment(ctx context.Context, startTime, endTime time.Time, path, suffix string, - opts TSDBOpts[T, O], ) (s *segment[T, O], err error) { suffixInteger, err := strconv.Atoi(suffix) if err != nil { @@ -69,8 +70,9 @@ func (sc *segmentController[T, O]) openSegment(ctx context.Context, startTime, e ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { return p }) - id := generateSegID(sc.opts.SegmentInterval.Unit, suffixInteger) - sir, err := newSeriesIndex(ctx, path, sc.opts.SeriesIndexFlushTimeoutSeconds, sc.indexMetrics) + options := sc.getOptions() + id := generateSegID(options.SegmentInterval.Unit, suffixInteger) + sir, err := newSeriesIndex(ctx, path, options.SeriesIndexFlushTimeoutSeconds, sc.indexMetrics) if err != nil { return nil, errors.Wrap(errOpenDatabase, errors.WithMessage(err, "create series index controller failed").Error()) } @@ -82,20 +84,21 @@ func (sc *segmentController[T, O]) openSegment(ctx context.Context, startTime, e position: p, refCount: 1, index: sir, - opts: opts, metrics: sc.metrics, + creator: options.TSTableCreator, + option: options.Option, } s.l = logger.Fetch(ctx, s.String()) - return s, s.loadShards() + return s, s.loadShards(int(options.ShardNum)) } -func (s *segment[T, O]) loadShards() error { +func (s *segment[T, O]) loadShards(shardNum int) error { return walkDir(s.location, shardPathPrefix, func(suffix string) error { shardID, err := strconv.Atoi(suffix) if err != nil { return err } - if shardID >= int(s.opts.ShardNum) { + if shardID >= shardNum { return nil } s.l.Info().Int("shard_id", shardID).Msg("loaded a existed shard") @@ -205,11 +208,12 @@ type segmentController[T TSTable, O any] struct { metrics Metrics l *logger.Logger indexMetrics *inverted.Metrics + opts *TSDBOpts[T, O] position common.Position location string lst []*segment[T, O] - opts TSDBOpts[T, O] deadline atomic.Int64 + optsMutex sync.RWMutex sync.RWMutex } @@ -219,7 +223,7 @@ func newSegmentController[T TSTable, O any](ctx context.Context, location string clock, _ := timestamp.GetClock(ctx) return &segmentController[T, O]{ location: location, - opts: opts, + opts: &opts, l: l, clock: clock, position: common.GetPosition(ctx), @@ -228,6 +232,25 @@ func newSegmentController[T TSTable, O any](ctx context.Context, location string } } +func (sc *segmentController[T, O]) getOptions() *TSDBOpts[T, O] { + sc.optsMutex.RLock() + defer sc.optsMutex.RUnlock() + return sc.opts +} + +func (sc *segmentController[T, O]) updateOptions(resourceOpts *commonv1.ResourceOpts) { + sc.optsMutex.Lock() + defer sc.optsMutex.Unlock() + si := MustToIntervalRule(resourceOpts.SegmentInterval) + if sc.opts.SegmentInterval.Unit != si.Unit { + sc.l.Panic().Msg("segment interval unit cannot be changed") + return + } + sc.opts.SegmentInterval = si + sc.opts.TTL = MustToIntervalRule(resourceOpts.Ttl) + sc.opts.ShardNum = resourceOpts.ShardNum +} + func (sc *segmentController[T, O]) selectSegments(timeRange timestamp.TimeRange) (tt []Segment[T, O]) { sc.RLock() defer sc.RUnlock() @@ -270,7 +293,7 @@ func (sc *segmentController[T, O]) segments() (ss []*segment[T, O]) { } func (sc *segmentController[T, O]) format(tm time.Time) string { - switch sc.opts.SegmentInterval.Unit { + switch sc.getOptions().SegmentInterval.Unit { case HOUR: return tm.Format(hourFormat) case DAY: @@ -280,7 +303,7 @@ func (sc *segmentController[T, O]) format(tm time.Time) string { } func (sc *segmentController[T, O]) parse(value string) (time.Time, error) { - switch sc.opts.SegmentInterval.Unit { + switch sc.getOptions().SegmentInterval.Unit { case HOUR: return time.ParseInLocation(hourFormat, value, time.Local) case DAY: @@ -293,7 +316,7 @@ func (sc *segmentController[T, O]) open() error { sc.Lock() defer sc.Unlock() emptySegments := make([]string, 0) - err := loadSegments(sc.location, segPathPrefix, sc, sc.opts.SegmentInterval, func(start, end time.Time) error { + err := loadSegments(sc.location, segPathPrefix, sc, sc.getOptions().SegmentInterval, func(start, end time.Time) error { suffix := sc.format(start) segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, suffix)) metadataPath := path.Join(segmentPath, metadataFilename) @@ -334,7 +357,8 @@ func (sc *segmentController[T, O]) create(start time.Time) (*segment[T, O], erro return s, nil } } - start = sc.opts.SegmentInterval.Unit.standard(start) + options := sc.getOptions() + start = options.SegmentInterval.Unit.standard(start) var next *segment[T, O] for _, s := range sc.lst { if s.Contains(start.UnixNano()) { @@ -344,7 +368,7 @@ func (sc *segmentController[T, O]) create(start time.Time) (*segment[T, O], erro next = s } } - stdEnd := sc.opts.SegmentInterval.nextTime(start) + stdEnd := options.SegmentInterval.nextTime(start) var end time.Time if next != nil && next.Start.Before(stdEnd) { end = next.Start @@ -381,7 +405,7 @@ func (sc *segmentController[T, O]) load(start, end time.Time, root string) (seg ctx := common.SetPosition(context.WithValue(context.Background(), logger.ContextKey, sc.l), func(_ common.Position) common.Position { return sc.position }) - seg, err = sc.openSegment(ctx, start, end, segPath, suffix, sc.opts) + seg, err = sc.openSegment(ctx, start, end, segPath, suffix) if err != nil { return nil, err } diff --git a/banyand/internal/storage/shard.go b/banyand/internal/storage/shard.go index 20c0807e..0c27f335 100644 --- a/banyand/internal/storage/shard.go +++ b/banyand/internal/storage/shard.go @@ -43,7 +43,7 @@ func (s *segment[T, O]) openShard(ctx context.Context, id common.ShardID) (*shar l.Info().Int("shard_id", int(id)).Str("path", location).Msg("creating a shard") p := common.GetPosition(ctx) p.Shard = strconv.Itoa(int(id)) - t, err := s.opts.TSTableCreator(lfs, location, p, l, s.TimeRange, s.opts.Option, s.metrics) + t, err := s.creator(lfs, location, p, l, s.TimeRange, s.option, s.metrics) if err != nil { return nil, err } diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index a25934b3..de058b0e 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -99,6 +99,7 @@ type TSDB[T TSTable, O any] interface { CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error) SelectSegments(timeRange timestamp.TimeRange) []Segment[T, O] Tick(ts int64) + UpdateOptions(opts *commonv1.ResourceOpts) } // Segment is a time range of data. diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go index f17dc7c2..5d62bc82 100644 --- a/banyand/internal/storage/tsdb.go +++ b/banyand/internal/storage/tsdb.go @@ -28,6 +28,7 @@ import ( "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index/inverted" @@ -79,13 +80,17 @@ type database[T TSTable, O any] struct { *metrics p common.Position location string - opts TSDBOpts[T, O] latestTickTime atomic.Int64 sync.RWMutex rotationProcessOn atomic.Bool + closed atomic.Bool } func (d *database[T, O]) Close() error { + if d.closed.Load() { + return nil + } + d.closed.Store(true) d.Lock() defer d.Unlock() d.scheduler.Close() @@ -122,7 +127,6 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts TSDBOpts[T, O]) (TSDB[ location: location, scheduler: scheduler, logger: l, - opts: opts, tsEventCh: make(chan int64), p: p, segmentController: newSegmentController[T](ctx, location, @@ -144,14 +148,30 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts TSDBOpts[T, O]) (TSDB[ } func (d *database[T, O]) CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error) { + if d.closed.Load() { + return nil, errors.New("database is closed") + } return d.segmentController.createSegment(ts) } func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange) []Segment[T, O] { + if d.closed.Load() { + return nil + } return d.segmentController.selectSegments(timeRange) } +func (d *database[T, O]) UpdateOptions(resourceOpts *commonv1.ResourceOpts) { + if d.closed.Load() { + return + } + d.segmentController.updateOptions(resourceOpts) +} + func (d *database[T, O]) collect() { + if d.closed.Load() { + return + } if d.metrics == nil { return } diff --git a/banyand/internal/storage/tsdb_test.go b/banyand/internal/storage/tsdb_test.go new file mode 100644 index 00000000..2b33a4f9 --- /dev/null +++ b/banyand/internal/storage/tsdb_test.go @@ -0,0 +1,164 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package storage + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +func TestOpenTSDB(t *testing.T) { + logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + }) + + t.Run("create new TSDB", func(t *testing.T) { + dir, defFn := test.Space(require.New(t)) + defer defFn() + + opts := TSDBOpts[*MockTSTable, any]{ + Location: dir, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 3}, + ShardNum: 1, + TSTableCreator: MockTSTableCreator, + } + + ctx := context.Background() + mc := timestamp.NewMockClock() + ts, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 00:00:00", time.Local) + require.NoError(t, err) + mc.Set(ts) + ctx = timestamp.SetClock(ctx, mc) + + tsdb, err := OpenTSDB(ctx, opts) + require.NoError(t, err) + require.NotNil(t, tsdb) + + seg, err := tsdb.CreateSegmentIfNotExist(ts) + require.NoError(t, err) + defer seg.DecRef() + + db := tsdb.(*database[*MockTSTable, any]) + require.Equal(t, len(db.segmentController.segments()), 1) + tsdb.Close() + }) + + t.Run("reopen existing TSDB", func(t *testing.T) { + dir, defFn := test.Space(require.New(t)) + defer defFn() + + opts := TSDBOpts[*MockTSTable, any]{ + Location: dir, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 3}, + ShardNum: 1, + TSTableCreator: MockTSTableCreator, + } + + ctx := context.Background() + mc := timestamp.NewMockClock() + ts, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 00:00:00", time.Local) + require.NoError(t, err) + mc.Set(ts) + ctx = timestamp.SetClock(ctx, mc) + + // Create new TSDB + tsdb, err := OpenTSDB(ctx, opts) + require.NoError(t, err) + require.NotNil(t, tsdb) + + seg, err := tsdb.CreateSegmentIfNotExist(ts) + require.NoError(t, err) + seg.DecRef() + + db := tsdb.(*database[*MockTSTable, any]) + segs := db.segmentController.segments() + require.Equal(t, len(segs), 1) + for i := range segs { + segs[i].DecRef() + } + tsdb.Close() + + // Reopen existing TSDB + tsdb, err = OpenTSDB(ctx, opts) + require.NoError(t, err) + require.NotNil(t, tsdb) + + db = tsdb.(*database[*MockTSTable, any]) + segs = db.segmentController.segments() + require.Equal(t, len(segs), 1) + for i := range segs { + segs[i].DecRef() + } + tsdb.Close() + }) + + t.Run("Changed options", func(t *testing.T) { + dir, defFn := test.Space(require.New(t)) + defer defFn() + + opts := TSDBOpts[*MockTSTable, any]{ + Location: dir, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 3}, + ShardNum: 1, + TSTableCreator: MockTSTableCreator, + } + + ctx := context.Background() + mc := timestamp.NewMockClock() + ts, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 00:00:00", time.Local) + require.NoError(t, err) + mc.Set(ts) + ctx = timestamp.SetClock(ctx, mc) + + // Create new TSDB + tsdb, err := OpenTSDB(ctx, opts) + require.NoError(t, err) + require.NotNil(t, tsdb) + + seg, err := tsdb.CreateSegmentIfNotExist(ts) + require.NoError(t, err) + seg.DecRef() + + tsdb.UpdateOptions(&commonv1.ResourceOpts{ + ShardNum: 2, + SegmentInterval: &commonv1.IntervalRule{ + Unit: commonv1.IntervalRule_UNIT_DAY, + Num: 2, + }, + Ttl: &commonv1.IntervalRule{ + Unit: commonv1.IntervalRule_UNIT_DAY, + Num: 6, + }, + }) + + tsdb.Close() + }) +} diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 83dbbc02..07de2db2 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -295,7 +295,7 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.Resourc return s.metadata.MeasureRegistry().GetMeasure(ctx, md) } -func (s *supplier) OpenDB(groupSchema *commonv1.Group) (io.Closer, error) { +func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error) { name := groupSchema.Metadata.Name p := common.Position{ Module: "measure", diff --git a/banyand/measure/metrics.go b/banyand/measure/metrics.go index 7bbeea42..96e8ff22 100644 --- a/banyand/measure/metrics.go +++ b/banyand/measure/metrics.go @@ -293,6 +293,9 @@ func (tst *tsTable) Collect(m storage.Metrics) { } metrics := m.(*metrics) snp := tst.currentSnapshot() + if snp == nil { + return + } defer snp.decRef() var totalMemPart, totalMemElements, totalMemBlocks, totalMemPartBytes, totalMemPartUncompressedBytes uint64 diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go index 7040bb2b..d8b9d627 100644 --- a/banyand/metadata/schema/group.go +++ b/banyand/metadata/schema/group.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" @@ -102,7 +103,17 @@ func (e *etcdSchemaRegistry) UpdateGroup(ctx context.Context, group *commonv1.Gr if group.Metadata.Name == "" { return errors.New("metadata.name is required") } - _, err := e.update(ctx, Metadata{ + g, err := e.GetGroup(ctx, group.GetMetadata().GetName()) + if err != nil { + return err + } + if proto.Equal(g.ResourceOpts, group.ResourceOpts) { + return nil + } + if g.GetResourceOpts().SegmentInterval.Unit != group.GetResourceOpts().SegmentInterval.Unit { + return errors.New("segment interval unit cannot be changed") + } + _, err = e.update(ctx, Metadata{ TypeMeta: TypeMeta{ Kind: KindGroup, Name: group.GetMetadata().GetName(), diff --git a/banyand/metadata/schema/measure.go b/banyand/metadata/schema/measure.go index d4461cfb..45c20f31 100644 --- a/banyand/metadata/schema/measure.go +++ b/banyand/metadata/schema/measure.go @@ -19,6 +19,7 @@ package schema import ( "context" + "fmt" "github.com/pkg/errors" "google.golang.org/protobuf/types/known/timestamppb" @@ -114,10 +115,8 @@ func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure *databas if prev == nil { return 0, errors.WithMessagef(ErrGRPCResourceNotFound, "measure %s not found", measure.GetMetadata().GetName()) } - for i, e := range prev.GetEntity().GetTagNames() { - if e != measure.GetEntity().GetTagNames()[i] { - return 0, errors.WithMessagef(ErrInputInvalid, "entity is immutable. Please create a new measure if you want to change entity") - } + if err := validateEqualExceptAppendTagsAndFields(prev, measure); err != nil { + return 0, errors.WithMessagef(ErrInputInvalid, "validation failed: %s", err) } return e.update(ctx, Metadata{ TypeMeta: TypeMeta{ @@ -130,6 +129,43 @@ func (e *etcdSchemaRegistry) UpdateMeasure(ctx context.Context, measure *databas }) } +func validateEqualExceptAppendTagsAndFields(prevMeasure, newMeasure *databasev1.Measure) error { + if prevMeasure.GetInterval() != newMeasure.GetInterval() { + return fmt.Errorf("interval is different: %s != %s", prevMeasure.GetInterval(), newMeasure.GetInterval()) + } + if prevMeasure.GetEntity().String() != newMeasure.GetEntity().String() { + return fmt.Errorf("entity is different: %s != %s", prevMeasure.GetEntity().String(), newMeasure.GetEntity().String()) + } + if prevMeasure.GetIndexMode() != newMeasure.GetIndexMode() { + return fmt.Errorf("index mode is different: %v != %v", prevMeasure.GetIndexMode(), newMeasure.GetIndexMode()) + } + if len(prevMeasure.GetTagFamilies()) > len(newMeasure.GetTagFamilies()) { + return fmt.Errorf("number of tag families is less in the new measure") + } + if len(prevMeasure.GetFields()) > len(newMeasure.GetFields()) { + return fmt.Errorf("number of fields is less in the new measure") + } + for i, tagFamily := range prevMeasure.GetTagFamilies() { + if tagFamily.Name != newMeasure.GetTagFamilies()[i].Name { + return fmt.Errorf("tag family name is different: %s != %s", tagFamily.Name, newMeasure.GetTagFamilies()[i].Name) + } + if len(tagFamily.Tags) > len(newMeasure.GetTagFamilies()[i].Tags) { + return fmt.Errorf("number of tags in tag family %s is less in the new measure", tagFamily.Name) + } + for j, tag := range tagFamily.Tags { + if tag.String() != newMeasure.GetTagFamilies()[i].Tags[j].String() { + return fmt.Errorf("tag %s in tag family %s is different: %s != %s", tag.Name, tagFamily.Name, tag.String(), newMeasure.GetTagFamilies()[i].Tags[j].String()) + } + } + } + for i, field := range prevMeasure.GetFields() { + if field.String() != newMeasure.GetFields()[i].String() { + return fmt.Errorf("field is different: %s != %s", field.String(), newMeasure.GetFields()[i].String()) + } + } + return nil +} + func (e *etcdSchemaRegistry) DeleteMeasure(ctx context.Context, metadata *commonv1.Metadata) (bool, error) { return e.delete(ctx, Metadata{ TypeMeta: TypeMeta{ diff --git a/banyand/metadata/schema/stream.go b/banyand/metadata/schema/stream.go index 31fdef77..502cb42c 100644 --- a/banyand/metadata/schema/stream.go +++ b/banyand/metadata/schema/stream.go @@ -19,6 +19,7 @@ package schema import ( "context" + "fmt" "github.com/pkg/errors" "google.golang.org/protobuf/types/known/timestamppb" @@ -68,20 +69,15 @@ func (e *etcdSchemaRegistry) UpdateStream(ctx context.Context, stream *databasev if err = validate.GroupForStreamOrMeasure(g); err != nil { return 0, err } - if err = validate.GroupForStreamOrMeasure(g); err != nil { - return 0, err - } prev, err := e.GetStream(ctx, stream.GetMetadata()) if err != nil { return 0, err } if prev == nil { - return 0, errors.WithMessagef(ErrGRPCResourceNotFound, "measure %s not found", stream.GetMetadata().GetName()) + return 0, errors.WithMessagef(ErrGRPCResourceNotFound, "stream %s not found", stream.GetMetadata().GetName()) } - for i, e := range prev.GetEntity().GetTagNames() { - if e != stream.GetEntity().GetTagNames()[i] { - return 0, errors.WithMessagef(ErrInputInvalid, "entity is immutable. Please create a new stream if you want to change entity") - } + if err := validateEqualExceptAppendTags(prev, stream); err != nil { + return 0, errors.WithMessagef(ErrInputInvalid, "validation failed: %s", err) } return e.update(ctx, Metadata{ TypeMeta: TypeMeta{ @@ -94,6 +90,29 @@ func (e *etcdSchemaRegistry) UpdateStream(ctx context.Context, stream *databasev }) } +func validateEqualExceptAppendTags(prevStream, newStream *databasev1.Stream) error { + if prevStream.GetEntity().String() != newStream.GetEntity().String() { + return fmt.Errorf("entity is different: %s != %s", prevStream.GetEntity().String(), newStream.GetEntity().String()) + } + if len(prevStream.GetTagFamilies()) > len(newStream.GetTagFamilies()) { + return fmt.Errorf("number of tag families is less in the new stream") + } + for i, tagFamily := range prevStream.GetTagFamilies() { + if tagFamily.Name != newStream.GetTagFamilies()[i].Name { + return fmt.Errorf("tag family name is different: %s != %s", tagFamily.Name, newStream.GetTagFamilies()[i].Name) + } + if len(tagFamily.Tags) > len(newStream.GetTagFamilies()[i].Tags) { + return fmt.Errorf("number of tags in tag family %s is less in the new stream", tagFamily.Name) + } + for j, tag := range tagFamily.Tags { + if tag.String() != newStream.GetTagFamilies()[i].Tags[j].String() { + return fmt.Errorf("tag %s in tag family %s is different: %s != %s", tag.Name, tagFamily.Name, tag.String(), newStream.GetTagFamilies()[i].Tags[j].String()) + } + } + } + return nil +} + func (e *etcdSchemaRegistry) CreateStream(ctx context.Context, stream *databasev1.Stream) (int64, error) { if stream.UpdatedAt != nil { stream.UpdatedAt = timestamppb.Now() diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index 1103b3d5..0c6a1c43 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -279,7 +279,7 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.Resourc return s.metadata.StreamRegistry().GetStream(ctx, md) } -func (s *supplier) OpenDB(groupSchema *commonv1.Group) (io.Closer, error) { +func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error) { name := groupSchema.Metadata.Name p := common.Position{ Module: "stream", diff --git a/banyand/stream/metrics.go b/banyand/stream/metrics.go index f77bd41b..339f3a2c 100644 --- a/banyand/stream/metrics.go +++ b/banyand/stream/metrics.go @@ -293,6 +293,10 @@ func (tst *tsTable) Collect(m storage.Metrics) { } metrics := m.(*metrics) snp := tst.currentSnapshot() + if snp == nil { + return + } + defer snp.decRef() var totalMemPart, totalMemElements, totalMemBlocks, totalMemPartBytes, totalMemPartUncompressedBytes uint64 diff --git a/docs/interacting/bydbctl/schema/group.md b/docs/interacting/bydbctl/schema/group.md index e4f9c51c..c1e17e14 100644 --- a/docs/interacting/bydbctl/schema/group.md +++ b/docs/interacting/bydbctl/schema/group.md @@ -66,6 +66,8 @@ resource_opts: EOF ``` +You can't change the unit of `segment_interval`. If you want to change the unit, you should delete the group and create a new one. + ## Delete operation Delete operation deletes a group's schema. diff --git a/docs/interacting/bydbctl/schema/measure.md b/docs/interacting/bydbctl/schema/measure.md index d8f8b3e7..7faae432 100644 --- a/docs/interacting/bydbctl/schema/measure.md +++ b/docs/interacting/bydbctl/schema/measure.md @@ -87,18 +87,32 @@ bydbctl measure update -f - <<EOF metadata: name: service_cpm_minute group: sw_metric -tagFamilies: - - name: searchable - tags: - - name: trace_id - type: TAG_TYPE_STRING +tag_families: +- name: default + tags: + - name: id + type: TAG_TYPE_STRING + - name: entity_id + type: TAG_TYPE_STRING + - name: new_tag + type: TAG_TYPE_STRING +fields: +- name: total + field_type: FIELD_TYPE_INT + encoding_method: ENCODING_METHOD_GORILLA + compression_method: COMPRESSION_METHOD_ZSTD +- name: value + field_type: FIELD_TYPE_INT + encoding_method: ENCODING_METHOD_GORILLA + compression_method: COMPRESSION_METHOD_ZSTD entity: tag_names: - - entity_id + - entity_id +interval: 1m EOF ``` -A measure's `entity` is immutable. If you want to change the entity field, you should delete the measure and create a new one. +You only can append new tags or fields to a measure. You can't change the existing tags or fields. The order of tags and fields is immutable, you can't change the order of tags or fields. You can't insert a new tag or field in the middle or front of the existing tags or fields. ## Delete operation diff --git a/docs/interacting/bydbctl/schema/stream.md b/docs/interacting/bydbctl/schema/stream.md index 1e18195a..3e2668b5 100644 --- a/docs/interacting/bydbctl/schema/stream.md +++ b/docs/interacting/bydbctl/schema/stream.md @@ -80,14 +80,16 @@ tagFamilies: tags: - name: trace_id type: TAG_TYPE_STRING + - name: trace_name + type: TAG_TYPE_STRING entity: - tagNames: + tagNames: - stream_id EOF ``` -A stream's `entity` field is immutable. If you want to change the entity field, you should delete the stream and create a new one. +You only can append new tags to a measure. You can't change the existing tags . The order of tags is immutable, you can't change the order of tags. You can't insert a new tag in the middle or front of the existing tags. ## Delete operation diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go index 7fcd31f4..3b7e34ac 100644 --- a/pkg/schema/cache.go +++ b/pkg/schema/cache.go @@ -28,6 +28,7 @@ import ( "github.com/pkg/errors" "go.uber.org/multierr" + "google.golang.org/protobuf/proto" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" @@ -270,15 +271,12 @@ func (sr *schemaRepo) storeGroup(groupMeta *commonv1.Metadata) (*group, error) { if groupSchema.GetMetadata().GetModRevision() <= prevGroupSchema.Metadata.ModRevision { return g, nil } - sr.l.Info().Str("group", name).Msg("closing the previous tsdb") - db := g.SupplyTSDB() - if db != nil { - db.Close() - } - sr.l.Info().Str("group", name).Msg("creating a new tsdb") - if err := g.init(name); err != nil { - return nil, err + g.groupSchema.Store(groupSchema) + if proto.Equal(groupSchema, prevGroupSchema) { + return g, nil } + sr.l.Info().Str("group", name).Msg("updating the group resource options") + g.db.Load().(DB).UpdateOptions(groupSchema.ResourceOpts) return g, nil } @@ -500,7 +498,7 @@ func (g *group) initBySchema(groupSchema *commonv1.Group) error { return err } g.db.Store(db) - return nil + return err } func (g *group) isInit() bool { @@ -512,10 +510,7 @@ func (g *group) GetSchema() *commonv1.Group { } func (g *group) SupplyTSDB() io.Closer { - if v := g.db.Load(); v != nil { - return v.(io.Closer) - } - return nil + return g.db.Load().(io.Closer) } func (g *group) isPortable() bool { @@ -526,11 +521,7 @@ func (g *group) close() (err error) { if !g.isInit() || g.isPortable() { return nil } - db := g.SupplyTSDB() - if db != nil { - err = multierr.Append(err, db.Close()) - } - return err + return multierr.Append(err, g.SupplyTSDB().Close()) } func parseMaxModRevision[T ResourceSchema](indexRules []T) (maxRevisionForIdxRules int64) { diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go index 6ee3c2d0..9415539d 100644 --- a/pkg/schema/schema.go +++ b/pkg/schema/schema.go @@ -90,7 +90,13 @@ type ResourceSchemaSupplier interface { // ResourceSupplier allows open a resource and its embedded tsdb. type ResourceSupplier interface { ResourceSchemaSupplier - OpenDB(groupSchema *commonv1.Group) (io.Closer, error) + OpenDB(groupSchema *commonv1.Group) (DB, error) +} + +// DB is the interface of a tsdb. +type DB interface { + io.Closer + UpdateOptions(opts *commonv1.ResourceOpts) } // Repository is the collection of several hierarchies groups by a "Group".
