This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch metadata-group
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5896e8c72abb3414f09a557571636d779fa8116f
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Dec 18 14:50:53 2024 +0800

    Fix schema updating bugs
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                 |   1 +
 banyand/internal/storage/rotation.go       |   5 +-
 banyand/internal/storage/segment.go        |  56 +++++++---
 banyand/internal/storage/shard.go          |   2 +-
 banyand/internal/storage/storage.go        |   1 +
 banyand/internal/storage/tsdb.go           |  24 ++++-
 banyand/internal/storage/tsdb_test.go      | 164 +++++++++++++++++++++++++++++
 banyand/measure/metadata.go                |   2 +-
 banyand/measure/metrics.go                 |   3 +
 banyand/metadata/schema/group.go           |  13 ++-
 banyand/metadata/schema/measure.go         |  44 +++++++-
 banyand/metadata/schema/stream.go          |  35 ++++--
 banyand/stream/metadata.go                 |   2 +-
 banyand/stream/metrics.go                  |   4 +
 docs/interacting/bydbctl/schema/group.md   |   2 +
 docs/interacting/bydbctl/schema/measure.md |  28 +++--
 docs/interacting/bydbctl/schema/stream.md  |   6 +-
 pkg/schema/cache.go                        |  27 ++---
 pkg/schema/schema.go                       |   8 +-
 19 files changed, 363 insertions(+), 64 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a5cd0734..8e5ec472 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -27,6 +27,7 @@ Release Notes.
 - Metadata: Fix the bug that the cache load nil value that is the unknown 
index rule on the index rule binding.
 - Queue: Fix the bug that the client remove a registered node in the eviction 
list. The node is controlled by the recovery loop, doesn't need to be removed 
in the failover process.
 - UI: Add prettier to enforce a consistent style by parsing code.
+- Fix the bug that fails to update `Group` Schema's ResourceOpts.
 
 ### Documentation
 
diff --git a/banyand/internal/storage/rotation.go 
b/banyand/internal/storage/rotation.go
index ab18daec..2066ef90 100644
--- a/banyand/internal/storage/rotation.go
+++ b/banyand/internal/storage/rotation.go
@@ -43,7 +43,8 @@ func (d *database[T, O]) Tick(ts int64) {
 }
 
 func (d *database[T, O]) startRotationTask() error {
-       rt := newRetentionTask(d, d.opts.TTL)
+       options := d.segmentController.getOptions()
+       rt := newRetentionTask(d, options.TTL)
        go func(rt *retentionTask[T, O]) {
                for ts := range d.tsEventCh {
                        func(ts int64) {
@@ -70,7 +71,7 @@ func (d *database[T, O]) startRotationTask() error {
                                        }
                                        d.incTotalRotationStarted(1)
                                        defer d.incTotalRotationFinished(1)
-                                       start := 
d.segmentController.opts.SegmentInterval.nextTime(t)
+                                       start := 
options.SegmentInterval.nextTime(t)
                                        d.logger.Info().Time("segment_start", 
start).Time("event_time", t).Msg("create new segment")
                                        _, err := 
d.segmentController.create(start)
                                        if err != nil {
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 02f59581..0451c649 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -32,6 +32,7 @@ import (
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -43,6 +44,8 @@ var ErrExpiredData = errors.New("expired data")
 
 type segment[T TSTable, O any] struct {
        metrics  any
+       option   O
+       creator  TSTableCreator[T, O]
        l        *logger.Logger
        index    *seriesIndex
        sLst     atomic.Pointer[[]*shard[T]]
@@ -50,7 +53,6 @@ type segment[T TSTable, O any] struct {
        timestamp.TimeRange
        suffix        string
        location      string
-       opts          TSDBOpts[T, O]
        mu            sync.Mutex
        refCount      int32
        mustBeDeleted uint32
@@ -58,7 +60,6 @@ type segment[T TSTable, O any] struct {
 }
 
 func (sc *segmentController[T, O]) openSegment(ctx context.Context, startTime, 
endTime time.Time, path, suffix string,
-       opts TSDBOpts[T, O],
 ) (s *segment[T, O], err error) {
        suffixInteger, err := strconv.Atoi(suffix)
        if err != nil {
@@ -69,8 +70,9 @@ func (sc *segmentController[T, O]) openSegment(ctx 
context.Context, startTime, e
        ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
                return p
        })
-       id := generateSegID(sc.opts.SegmentInterval.Unit, suffixInteger)
-       sir, err := newSeriesIndex(ctx, path, 
sc.opts.SeriesIndexFlushTimeoutSeconds, sc.indexMetrics)
+       options := sc.getOptions()
+       id := generateSegID(options.SegmentInterval.Unit, suffixInteger)
+       sir, err := newSeriesIndex(ctx, path, 
options.SeriesIndexFlushTimeoutSeconds, sc.indexMetrics)
        if err != nil {
                return nil, errors.Wrap(errOpenDatabase, 
errors.WithMessage(err, "create series index controller failed").Error())
        }
@@ -82,20 +84,21 @@ func (sc *segmentController[T, O]) openSegment(ctx 
context.Context, startTime, e
                position:  p,
                refCount:  1,
                index:     sir,
-               opts:      opts,
                metrics:   sc.metrics,
+               creator:   options.TSTableCreator,
+               option:    options.Option,
        }
        s.l = logger.Fetch(ctx, s.String())
-       return s, s.loadShards()
+       return s, s.loadShards(int(options.ShardNum))
 }
 
-func (s *segment[T, O]) loadShards() error {
+func (s *segment[T, O]) loadShards(shardNum int) error {
        return walkDir(s.location, shardPathPrefix, func(suffix string) error {
                shardID, err := strconv.Atoi(suffix)
                if err != nil {
                        return err
                }
-               if shardID >= int(s.opts.ShardNum) {
+               if shardID >= shardNum {
                        return nil
                }
                s.l.Info().Int("shard_id", shardID).Msg("loaded a existed 
shard")
@@ -205,11 +208,12 @@ type segmentController[T TSTable, O any] struct {
        metrics      Metrics
        l            *logger.Logger
        indexMetrics *inverted.Metrics
+       opts         *TSDBOpts[T, O]
        position     common.Position
        location     string
        lst          []*segment[T, O]
-       opts         TSDBOpts[T, O]
        deadline     atomic.Int64
+       optsMutex    sync.RWMutex
        sync.RWMutex
 }
 
@@ -219,7 +223,7 @@ func newSegmentController[T TSTable, O any](ctx 
context.Context, location string
        clock, _ := timestamp.GetClock(ctx)
        return &segmentController[T, O]{
                location:     location,
-               opts:         opts,
+               opts:         &opts,
                l:            l,
                clock:        clock,
                position:     common.GetPosition(ctx),
@@ -228,6 +232,25 @@ func newSegmentController[T TSTable, O any](ctx 
context.Context, location string
        }
 }
 
+func (sc *segmentController[T, O]) getOptions() *TSDBOpts[T, O] {
+       sc.optsMutex.RLock()
+       defer sc.optsMutex.RUnlock()
+       return sc.opts
+}
+
+func (sc *segmentController[T, O]) updateOptions(resourceOpts 
*commonv1.ResourceOpts) {
+       sc.optsMutex.Lock()
+       defer sc.optsMutex.Unlock()
+       si := MustToIntervalRule(resourceOpts.SegmentInterval)
+       if sc.opts.SegmentInterval.Unit != si.Unit {
+               sc.l.Panic().Msg("segment interval unit cannot be changed")
+               return
+       }
+       sc.opts.SegmentInterval = si
+       sc.opts.TTL = MustToIntervalRule(resourceOpts.Ttl)
+       sc.opts.ShardNum = resourceOpts.ShardNum
+}
+
 func (sc *segmentController[T, O]) selectSegments(timeRange 
timestamp.TimeRange) (tt []Segment[T, O]) {
        sc.RLock()
        defer sc.RUnlock()
@@ -270,7 +293,7 @@ func (sc *segmentController[T, O]) segments() (ss 
[]*segment[T, O]) {
 }
 
 func (sc *segmentController[T, O]) format(tm time.Time) string {
-       switch sc.opts.SegmentInterval.Unit {
+       switch sc.getOptions().SegmentInterval.Unit {
        case HOUR:
                return tm.Format(hourFormat)
        case DAY:
@@ -280,7 +303,7 @@ func (sc *segmentController[T, O]) format(tm time.Time) 
string {
 }
 
 func (sc *segmentController[T, O]) parse(value string) (time.Time, error) {
-       switch sc.opts.SegmentInterval.Unit {
+       switch sc.getOptions().SegmentInterval.Unit {
        case HOUR:
                return time.ParseInLocation(hourFormat, value, time.Local)
        case DAY:
@@ -293,7 +316,7 @@ func (sc *segmentController[T, O]) open() error {
        sc.Lock()
        defer sc.Unlock()
        emptySegments := make([]string, 0)
-       err := loadSegments(sc.location, segPathPrefix, sc, 
sc.opts.SegmentInterval, func(start, end time.Time) error {
+       err := loadSegments(sc.location, segPathPrefix, sc, 
sc.getOptions().SegmentInterval, func(start, end time.Time) error {
                suffix := sc.format(start)
                segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate, 
suffix))
                metadataPath := path.Join(segmentPath, metadataFilename)
@@ -334,7 +357,8 @@ func (sc *segmentController[T, O]) create(start time.Time) 
(*segment[T, O], erro
                        return s, nil
                }
        }
-       start = sc.opts.SegmentInterval.Unit.standard(start)
+       options := sc.getOptions()
+       start = options.SegmentInterval.Unit.standard(start)
        var next *segment[T, O]
        for _, s := range sc.lst {
                if s.Contains(start.UnixNano()) {
@@ -344,7 +368,7 @@ func (sc *segmentController[T, O]) create(start time.Time) 
(*segment[T, O], erro
                        next = s
                }
        }
-       stdEnd := sc.opts.SegmentInterval.nextTime(start)
+       stdEnd := options.SegmentInterval.nextTime(start)
        var end time.Time
        if next != nil && next.Start.Before(stdEnd) {
                end = next.Start
@@ -381,7 +405,7 @@ func (sc *segmentController[T, O]) load(start, end 
time.Time, root string) (seg
        ctx := common.SetPosition(context.WithValue(context.Background(), 
logger.ContextKey, sc.l), func(_ common.Position) common.Position {
                return sc.position
        })
-       seg, err = sc.openSegment(ctx, start, end, segPath, suffix, sc.opts)
+       seg, err = sc.openSegment(ctx, start, end, segPath, suffix)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/internal/storage/shard.go 
b/banyand/internal/storage/shard.go
index 20c0807e..0c27f335 100644
--- a/banyand/internal/storage/shard.go
+++ b/banyand/internal/storage/shard.go
@@ -43,7 +43,7 @@ func (s *segment[T, O]) openShard(ctx context.Context, id 
common.ShardID) (*shar
        l.Info().Int("shard_id", int(id)).Str("path", location).Msg("creating a 
shard")
        p := common.GetPosition(ctx)
        p.Shard = strconv.Itoa(int(id))
-       t, err := s.opts.TSTableCreator(lfs, location, p, l, s.TimeRange, 
s.opts.Option, s.metrics)
+       t, err := s.creator(lfs, location, p, l, s.TimeRange, s.option, 
s.metrics)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index a25934b3..de058b0e 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -99,6 +99,7 @@ type TSDB[T TSTable, O any] interface {
        CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error)
        SelectSegments(timeRange timestamp.TimeRange) []Segment[T, O]
        Tick(ts int64)
+       UpdateOptions(opts *commonv1.ResourceOpts)
 }
 
 // Segment is a time range of data.
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index f17dc7c2..5d62bc82 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -28,6 +28,7 @@ import (
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
@@ -79,13 +80,17 @@ type database[T TSTable, O any] struct {
        *metrics
        p              common.Position
        location       string
-       opts           TSDBOpts[T, O]
        latestTickTime atomic.Int64
        sync.RWMutex
        rotationProcessOn atomic.Bool
+       closed            atomic.Bool
 }
 
 func (d *database[T, O]) Close() error {
+       if d.closed.Load() {
+               return nil
+       }
+       d.closed.Store(true)
        d.Lock()
        defer d.Unlock()
        d.scheduler.Close()
@@ -122,7 +127,6 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts 
TSDBOpts[T, O]) (TSDB[
                location:  location,
                scheduler: scheduler,
                logger:    l,
-               opts:      opts,
                tsEventCh: make(chan int64),
                p:         p,
                segmentController: newSegmentController[T](ctx, location,
@@ -144,14 +148,30 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts 
TSDBOpts[T, O]) (TSDB[
 }
 
 func (d *database[T, O]) CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], 
error) {
+       if d.closed.Load() {
+               return nil, errors.New("database is closed")
+       }
        return d.segmentController.createSegment(ts)
 }
 
 func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange) 
[]Segment[T, O] {
+       if d.closed.Load() {
+               return nil
+       }
        return d.segmentController.selectSegments(timeRange)
 }
 
+func (d *database[T, O]) UpdateOptions(resourceOpts *commonv1.ResourceOpts) {
+       if d.closed.Load() {
+               return
+       }
+       d.segmentController.updateOptions(resourceOpts)
+}
+
 func (d *database[T, O]) collect() {
+       if d.closed.Load() {
+               return
+       }
        if d.metrics == nil {
                return
        }
diff --git a/banyand/internal/storage/tsdb_test.go 
b/banyand/internal/storage/tsdb_test.go
new file mode 100644
index 00000000..2b33a4f9
--- /dev/null
+++ b/banyand/internal/storage/tsdb_test.go
@@ -0,0 +1,164 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+func TestOpenTSDB(t *testing.T) {
+       logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })
+
+       t.Run("create new TSDB", func(t *testing.T) {
+               dir, defFn := test.Space(require.New(t))
+               defer defFn()
+
+               opts := TSDBOpts[*MockTSTable, any]{
+                       Location:        dir,
+                       SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+                       TTL:             IntervalRule{Unit: DAY, Num: 3},
+                       ShardNum:        1,
+                       TSTableCreator:  MockTSTableCreator,
+               }
+
+               ctx := context.Background()
+               mc := timestamp.NewMockClock()
+               ts, err := time.ParseInLocation("2006-01-02 15:04:05", 
"2024-05-01 00:00:00", time.Local)
+               require.NoError(t, err)
+               mc.Set(ts)
+               ctx = timestamp.SetClock(ctx, mc)
+
+               tsdb, err := OpenTSDB(ctx, opts)
+               require.NoError(t, err)
+               require.NotNil(t, tsdb)
+
+               seg, err := tsdb.CreateSegmentIfNotExist(ts)
+               require.NoError(t, err)
+               defer seg.DecRef()
+
+               db := tsdb.(*database[*MockTSTable, any])
+               require.Equal(t, len(db.segmentController.segments()), 1)
+               tsdb.Close()
+       })
+
+       t.Run("reopen existing TSDB", func(t *testing.T) {
+               dir, defFn := test.Space(require.New(t))
+               defer defFn()
+
+               opts := TSDBOpts[*MockTSTable, any]{
+                       Location:        dir,
+                       SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+                       TTL:             IntervalRule{Unit: DAY, Num: 3},
+                       ShardNum:        1,
+                       TSTableCreator:  MockTSTableCreator,
+               }
+
+               ctx := context.Background()
+               mc := timestamp.NewMockClock()
+               ts, err := time.ParseInLocation("2006-01-02 15:04:05", 
"2024-05-01 00:00:00", time.Local)
+               require.NoError(t, err)
+               mc.Set(ts)
+               ctx = timestamp.SetClock(ctx, mc)
+
+               // Create new TSDB
+               tsdb, err := OpenTSDB(ctx, opts)
+               require.NoError(t, err)
+               require.NotNil(t, tsdb)
+
+               seg, err := tsdb.CreateSegmentIfNotExist(ts)
+               require.NoError(t, err)
+               seg.DecRef()
+
+               db := tsdb.(*database[*MockTSTable, any])
+               segs := db.segmentController.segments()
+               require.Equal(t, len(segs), 1)
+               for i := range segs {
+                       segs[i].DecRef()
+               }
+               tsdb.Close()
+
+               // Reopen existing TSDB
+               tsdb, err = OpenTSDB(ctx, opts)
+               require.NoError(t, err)
+               require.NotNil(t, tsdb)
+
+               db = tsdb.(*database[*MockTSTable, any])
+               segs = db.segmentController.segments()
+               require.Equal(t, len(segs), 1)
+               for i := range segs {
+                       segs[i].DecRef()
+               }
+               tsdb.Close()
+       })
+
+       t.Run("Changed options", func(t *testing.T) {
+               dir, defFn := test.Space(require.New(t))
+               defer defFn()
+
+               opts := TSDBOpts[*MockTSTable, any]{
+                       Location:        dir,
+                       SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+                       TTL:             IntervalRule{Unit: DAY, Num: 3},
+                       ShardNum:        1,
+                       TSTableCreator:  MockTSTableCreator,
+               }
+
+               ctx := context.Background()
+               mc := timestamp.NewMockClock()
+               ts, err := time.ParseInLocation("2006-01-02 15:04:05", 
"2024-05-01 00:00:00", time.Local)
+               require.NoError(t, err)
+               mc.Set(ts)
+               ctx = timestamp.SetClock(ctx, mc)
+
+               // Create new TSDB
+               tsdb, err := OpenTSDB(ctx, opts)
+               require.NoError(t, err)
+               require.NotNil(t, tsdb)
+
+               seg, err := tsdb.CreateSegmentIfNotExist(ts)
+               require.NoError(t, err)
+               seg.DecRef()
+
+               tsdb.UpdateOptions(&commonv1.ResourceOpts{
+                       ShardNum: 2,
+                       SegmentInterval: &commonv1.IntervalRule{
+                               Unit: commonv1.IntervalRule_UNIT_DAY,
+                               Num:  2,
+                       },
+                       Ttl: &commonv1.IntervalRule{
+                               Unit: commonv1.IntervalRule_UNIT_DAY,
+                               Num:  6,
+                       },
+               })
+
+               tsdb.Close()
+       })
+}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 83dbbc02..07de2db2 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -295,7 +295,7 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.Resourc
        return s.metadata.MeasureRegistry().GetMeasure(ctx, md)
 }
 
-func (s *supplier) OpenDB(groupSchema *commonv1.Group) (io.Closer, error) {
+func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, 
error) {
        name := groupSchema.Metadata.Name
        p := common.Position{
                Module:   "measure",
diff --git a/banyand/measure/metrics.go b/banyand/measure/metrics.go
index 7bbeea42..96e8ff22 100644
--- a/banyand/measure/metrics.go
+++ b/banyand/measure/metrics.go
@@ -293,6 +293,9 @@ func (tst *tsTable) Collect(m storage.Metrics) {
        }
        metrics := m.(*metrics)
        snp := tst.currentSnapshot()
+       if snp == nil {
+               return
+       }
        defer snp.decRef()
 
        var totalMemPart, totalMemElements, totalMemBlocks, totalMemPartBytes, 
totalMemPartUncompressedBytes uint64
diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go
index 7040bb2b..d8b9d627 100644
--- a/banyand/metadata/schema/group.go
+++ b/banyand/metadata/schema/group.go
@@ -23,6 +23,7 @@ import (
 
        "github.com/pkg/errors"
        clientv3 "go.etcd.io/etcd/client/v3"
+       "google.golang.org/protobuf/proto"
        "google.golang.org/protobuf/types/known/timestamppb"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -102,7 +103,17 @@ func (e *etcdSchemaRegistry) UpdateGroup(ctx 
context.Context, group *commonv1.Gr
        if group.Metadata.Name == "" {
                return errors.New("metadata.name is required")
        }
-       _, err := e.update(ctx, Metadata{
+       g, err := e.GetGroup(ctx, group.GetMetadata().GetName())
+       if err != nil {
+               return err
+       }
+       if proto.Equal(g.ResourceOpts, group.ResourceOpts) {
+               return nil
+       }
+       if g.GetResourceOpts().SegmentInterval.Unit != 
group.GetResourceOpts().SegmentInterval.Unit {
+               return errors.New("segment interval unit cannot be changed")
+       }
+       _, err = e.update(ctx, Metadata{
                TypeMeta: TypeMeta{
                        Kind: KindGroup,
                        Name: group.GetMetadata().GetName(),
diff --git a/banyand/metadata/schema/measure.go 
b/banyand/metadata/schema/measure.go
index d4461cfb..45c20f31 100644
--- a/banyand/metadata/schema/measure.go
+++ b/banyand/metadata/schema/measure.go
@@ -19,6 +19,7 @@ package schema
 
 import (
        "context"
+       "fmt"
 
        "github.com/pkg/errors"
        "google.golang.org/protobuf/types/known/timestamppb"
@@ -114,10 +115,8 @@ func (e *etcdSchemaRegistry) UpdateMeasure(ctx 
context.Context, measure *databas
        if prev == nil {
                return 0, errors.WithMessagef(ErrGRPCResourceNotFound, "measure 
%s not found", measure.GetMetadata().GetName())
        }
-       for i, e := range prev.GetEntity().GetTagNames() {
-               if e != measure.GetEntity().GetTagNames()[i] {
-                       return 0, errors.WithMessagef(ErrInputInvalid, "entity 
is immutable. Please create a new measure if you want to change entity")
-               }
+       if err := validateEqualExceptAppendTagsAndFields(prev, measure); err != 
nil {
+               return 0, errors.WithMessagef(ErrInputInvalid, "validation 
failed: %s", err)
        }
        return e.update(ctx, Metadata{
                TypeMeta: TypeMeta{
@@ -130,6 +129,43 @@ func (e *etcdSchemaRegistry) UpdateMeasure(ctx 
context.Context, measure *databas
        })
 }
 
+func validateEqualExceptAppendTagsAndFields(prevMeasure, newMeasure 
*databasev1.Measure) error {
+       if prevMeasure.GetInterval() != newMeasure.GetInterval() {
+               return fmt.Errorf("interval is different: %s != %s", 
prevMeasure.GetInterval(), newMeasure.GetInterval())
+       }
+       if prevMeasure.GetEntity().String() != newMeasure.GetEntity().String() {
+               return fmt.Errorf("entity is different: %s != %s", 
prevMeasure.GetEntity().String(), newMeasure.GetEntity().String())
+       }
+       if prevMeasure.GetIndexMode() != newMeasure.GetIndexMode() {
+               return fmt.Errorf("index mode is different: %v != %v", 
prevMeasure.GetIndexMode(), newMeasure.GetIndexMode())
+       }
+       if len(prevMeasure.GetTagFamilies()) > len(newMeasure.GetTagFamilies()) 
{
+               return fmt.Errorf("number of tag families is less in the new 
measure")
+       }
+       if len(prevMeasure.GetFields()) > len(newMeasure.GetFields()) {
+               return fmt.Errorf("number of fields is less in the new measure")
+       }
+       for i, tagFamily := range prevMeasure.GetTagFamilies() {
+               if tagFamily.Name != newMeasure.GetTagFamilies()[i].Name {
+                       return fmt.Errorf("tag family name is different: %s != 
%s", tagFamily.Name, newMeasure.GetTagFamilies()[i].Name)
+               }
+               if len(tagFamily.Tags) > 
len(newMeasure.GetTagFamilies()[i].Tags) {
+                       return fmt.Errorf("number of tags in tag family %s is 
less in the new measure", tagFamily.Name)
+               }
+               for j, tag := range tagFamily.Tags {
+                       if tag.String() != 
newMeasure.GetTagFamilies()[i].Tags[j].String() {
+                               return fmt.Errorf("tag %s in tag family %s is 
different: %s != %s", tag.Name, tagFamily.Name, tag.String(), 
newMeasure.GetTagFamilies()[i].Tags[j].String())
+                       }
+               }
+       }
+       for i, field := range prevMeasure.GetFields() {
+               if field.String() != newMeasure.GetFields()[i].String() {
+                       return fmt.Errorf("field is different: %s != %s", 
field.String(), newMeasure.GetFields()[i].String())
+               }
+       }
+       return nil
+}
+
 func (e *etcdSchemaRegistry) DeleteMeasure(ctx context.Context, metadata 
*commonv1.Metadata) (bool, error) {
        return e.delete(ctx, Metadata{
                TypeMeta: TypeMeta{
diff --git a/banyand/metadata/schema/stream.go 
b/banyand/metadata/schema/stream.go
index 31fdef77..502cb42c 100644
--- a/banyand/metadata/schema/stream.go
+++ b/banyand/metadata/schema/stream.go
@@ -19,6 +19,7 @@ package schema
 
 import (
        "context"
+       "fmt"
 
        "github.com/pkg/errors"
        "google.golang.org/protobuf/types/known/timestamppb"
@@ -68,20 +69,15 @@ func (e *etcdSchemaRegistry) UpdateStream(ctx 
context.Context, stream *databasev
        if err = validate.GroupForStreamOrMeasure(g); err != nil {
                return 0, err
        }
-       if err = validate.GroupForStreamOrMeasure(g); err != nil {
-               return 0, err
-       }
        prev, err := e.GetStream(ctx, stream.GetMetadata())
        if err != nil {
                return 0, err
        }
        if prev == nil {
-               return 0, errors.WithMessagef(ErrGRPCResourceNotFound, "measure 
%s not found", stream.GetMetadata().GetName())
+               return 0, errors.WithMessagef(ErrGRPCResourceNotFound, "stream 
%s not found", stream.GetMetadata().GetName())
        }
-       for i, e := range prev.GetEntity().GetTagNames() {
-               if e != stream.GetEntity().GetTagNames()[i] {
-                       return 0, errors.WithMessagef(ErrInputInvalid, "entity 
is immutable. Please create a new stream if you want to change entity")
-               }
+       if err := validateEqualExceptAppendTags(prev, stream); err != nil {
+               return 0, errors.WithMessagef(ErrInputInvalid, "validation 
failed: %s", err)
        }
        return e.update(ctx, Metadata{
                TypeMeta: TypeMeta{
@@ -94,6 +90,29 @@ func (e *etcdSchemaRegistry) UpdateStream(ctx 
context.Context, stream *databasev
        })
 }
 
+func validateEqualExceptAppendTags(prevStream, newStream *databasev1.Stream) 
error {
+       if prevStream.GetEntity().String() != newStream.GetEntity().String() {
+               return fmt.Errorf("entity is different: %s != %s", 
prevStream.GetEntity().String(), newStream.GetEntity().String())
+       }
+       if len(prevStream.GetTagFamilies()) > len(newStream.GetTagFamilies()) {
+               return fmt.Errorf("number of tag families is less in the new 
stream")
+       }
+       for i, tagFamily := range prevStream.GetTagFamilies() {
+               if tagFamily.Name != newStream.GetTagFamilies()[i].Name {
+                       return fmt.Errorf("tag family name is different: %s != 
%s", tagFamily.Name, newStream.GetTagFamilies()[i].Name)
+               }
+               if len(tagFamily.Tags) > 
len(newStream.GetTagFamilies()[i].Tags) {
+                       return fmt.Errorf("number of tags in tag family %s is 
less in the new stream", tagFamily.Name)
+               }
+               for j, tag := range tagFamily.Tags {
+                       if tag.String() != 
newStream.GetTagFamilies()[i].Tags[j].String() {
+                               return fmt.Errorf("tag %s in tag family %s is 
different: %s != %s", tag.Name, tagFamily.Name, tag.String(), 
newStream.GetTagFamilies()[i].Tags[j].String())
+                       }
+               }
+       }
+       return nil
+}
+
 func (e *etcdSchemaRegistry) CreateStream(ctx context.Context, stream 
*databasev1.Stream) (int64, error) {
        if stream.UpdatedAt != nil {
                stream.UpdatedAt = timestamppb.Now()
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 1103b3d5..0c6a1c43 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -279,7 +279,7 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.Resourc
        return s.metadata.StreamRegistry().GetStream(ctx, md)
 }
 
-func (s *supplier) OpenDB(groupSchema *commonv1.Group) (io.Closer, error) {
+func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, 
error) {
        name := groupSchema.Metadata.Name
        p := common.Position{
                Module:   "stream",
diff --git a/banyand/stream/metrics.go b/banyand/stream/metrics.go
index f77bd41b..339f3a2c 100644
--- a/banyand/stream/metrics.go
+++ b/banyand/stream/metrics.go
@@ -293,6 +293,10 @@ func (tst *tsTable) Collect(m storage.Metrics) {
        }
        metrics := m.(*metrics)
        snp := tst.currentSnapshot()
+       if snp == nil {
+               return
+       }
+
        defer snp.decRef()
 
        var totalMemPart, totalMemElements, totalMemBlocks, totalMemPartBytes, 
totalMemPartUncompressedBytes uint64
diff --git a/docs/interacting/bydbctl/schema/group.md 
b/docs/interacting/bydbctl/schema/group.md
index e4f9c51c..c1e17e14 100644
--- a/docs/interacting/bydbctl/schema/group.md
+++ b/docs/interacting/bydbctl/schema/group.md
@@ -66,6 +66,8 @@ resource_opts:
 EOF
 ```
 
+You can't change the unit of `segment_interval`. If you want to change the 
unit, you should delete the group and create a new one.
+
 ## Delete operation
 
 Delete operation deletes a group's schema.
diff --git a/docs/interacting/bydbctl/schema/measure.md 
b/docs/interacting/bydbctl/schema/measure.md
index d8f8b3e7..7faae432 100644
--- a/docs/interacting/bydbctl/schema/measure.md
+++ b/docs/interacting/bydbctl/schema/measure.md
@@ -87,18 +87,32 @@ bydbctl measure update -f - <<EOF
 metadata:
   name: service_cpm_minute
   group: sw_metric
-tagFamilies:
-  - name: searchable
-    tags: 
-      - name: trace_id
-        type: TAG_TYPE_STRING
+tag_families:
+- name: default
+  tags:
+  - name: id
+    type: TAG_TYPE_STRING
+  - name: entity_id
+    type: TAG_TYPE_STRING
+  - name: new_tag
+    type: TAG_TYPE_STRING
+fields:
+- name: total
+  field_type: FIELD_TYPE_INT
+  encoding_method: ENCODING_METHOD_GORILLA
+  compression_method: COMPRESSION_METHOD_ZSTD
+- name: value
+  field_type: FIELD_TYPE_INT
+  encoding_method: ENCODING_METHOD_GORILLA
+  compression_method: COMPRESSION_METHOD_ZSTD
 entity:
   tag_names:
-    - entity_id
+  - entity_id
+interval: 1m
 EOF
 ```
 
-A measure's `entity` is immutable. If you want to change the entity field, you 
should delete the measure and create a new one.
+You only can append new tags or fields to a measure. You can't change the 
existing tags or fields. The order of tags and fields is immutable, you can't 
change the order of tags or fields. You can't insert a new tag or field in the 
middle or front of the existing tags or fields.
 
 ## Delete operation
 
diff --git a/docs/interacting/bydbctl/schema/stream.md 
b/docs/interacting/bydbctl/schema/stream.md
index 1e18195a..3e2668b5 100644
--- a/docs/interacting/bydbctl/schema/stream.md
+++ b/docs/interacting/bydbctl/schema/stream.md
@@ -80,14 +80,16 @@ tagFamilies:
     tags: 
       - name: trace_id
         type: TAG_TYPE_STRING
+      - name: trace_name
+        type: TAG_TYPE_STRING
 entity:
-  tagNames:    
+  tagNames:
     - stream_id
 EOF
 
 ```
 
-A stream's `entity` field is immutable. If you want to change the entity 
field, you should delete the stream and create a new one.
+You only can append new tags to a measure. You can't change the existing tags 
. The order of tags is immutable, you can't change the order of tags. You can't 
insert a new tag in the middle or front of the existing tags.
 
 ## Delete operation
 
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index 7fcd31f4..3b7e34ac 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -28,6 +28,7 @@ import (
 
        "github.com/pkg/errors"
        "go.uber.org/multierr"
+       "google.golang.org/protobuf/proto"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -270,15 +271,12 @@ func (sr *schemaRepo) storeGroup(groupMeta 
*commonv1.Metadata) (*group, error) {
        if groupSchema.GetMetadata().GetModRevision() <= 
prevGroupSchema.Metadata.ModRevision {
                return g, nil
        }
-       sr.l.Info().Str("group", name).Msg("closing the previous tsdb")
-       db := g.SupplyTSDB()
-       if db != nil {
-               db.Close()
-       }
-       sr.l.Info().Str("group", name).Msg("creating a new tsdb")
-       if err := g.init(name); err != nil {
-               return nil, err
+       g.groupSchema.Store(groupSchema)
+       if proto.Equal(groupSchema, prevGroupSchema) {
+               return g, nil
        }
+       sr.l.Info().Str("group", name).Msg("updating the group resource 
options")
+       g.db.Load().(DB).UpdateOptions(groupSchema.ResourceOpts)
        return g, nil
 }
 
@@ -500,7 +498,7 @@ func (g *group) initBySchema(groupSchema *commonv1.Group) 
error {
                return err
        }
        g.db.Store(db)
-       return nil
+       return err
 }
 
 func (g *group) isInit() bool {
@@ -512,10 +510,7 @@ func (g *group) GetSchema() *commonv1.Group {
 }
 
 func (g *group) SupplyTSDB() io.Closer {
-       if v := g.db.Load(); v != nil {
-               return v.(io.Closer)
-       }
-       return nil
+       return g.db.Load().(io.Closer)
 }
 
 func (g *group) isPortable() bool {
@@ -526,11 +521,7 @@ func (g *group) close() (err error) {
        if !g.isInit() || g.isPortable() {
                return nil
        }
-       db := g.SupplyTSDB()
-       if db != nil {
-               err = multierr.Append(err, db.Close())
-       }
-       return err
+       return multierr.Append(err, g.SupplyTSDB().Close())
 }
 
 func parseMaxModRevision[T ResourceSchema](indexRules []T) 
(maxRevisionForIdxRules int64) {
diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go
index 6ee3c2d0..9415539d 100644
--- a/pkg/schema/schema.go
+++ b/pkg/schema/schema.go
@@ -90,7 +90,13 @@ type ResourceSchemaSupplier interface {
 // ResourceSupplier allows open a resource and its embedded tsdb.
 type ResourceSupplier interface {
        ResourceSchemaSupplier
-       OpenDB(groupSchema *commonv1.Group) (io.Closer, error)
+       OpenDB(groupSchema *commonv1.Group) (DB, error)
+}
+
+// DB is the interface of a tsdb.
+type DB interface {
+       io.Closer
+       UpdateOptions(opts *commonv1.ResourceOpts)
 }
 
 // Repository is the collection of several hierarchies groups by a "Group".

Reply via email to