This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 5931e826 Fix schema updating bugs (#576)
5931e826 is described below
commit 5931e826d7f60449b353f2843362c745823d9d04
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Dec 18 15:08:43 2024 +0800
Fix schema updating bugs (#576)
---
CHANGES.md | 2 +
banyand/internal/storage/rotation.go | 5 +-
banyand/internal/storage/segment.go | 56 +++++++---
banyand/internal/storage/shard.go | 2 +-
banyand/internal/storage/storage.go | 1 +
banyand/internal/storage/tsdb.go | 24 ++++-
banyand/internal/storage/tsdb_test.go | 164 +++++++++++++++++++++++++++++
banyand/measure/metadata.go | 2 +-
banyand/measure/metrics.go | 3 +
banyand/metadata/schema/group.go | 13 ++-
banyand/metadata/schema/measure.go | 44 +++++++-
banyand/metadata/schema/stream.go | 35 ++++--
banyand/stream/metadata.go | 2 +-
banyand/stream/metrics.go | 4 +
docs/interacting/bydbctl/schema/group.md | 2 +
docs/interacting/bydbctl/schema/measure.md | 28 +++--
docs/interacting/bydbctl/schema/stream.md | 6 +-
pkg/schema/cache.go | 27 ++---
pkg/schema/schema.go | 8 +-
19 files changed, 364 insertions(+), 64 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index a5cd0734..ddaede98 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -27,11 +27,13 @@ Release Notes.
- Metadata: Fix the bug that the cache load nil value that is the unknown
index rule on the index rule binding.
- Queue: Fix the bug that the client remove a registered node in the eviction
list. The node is controlled by the recovery loop, doesn't need to be removed
in the failover process.
- UI: Add prettier to enforce a consistent style by parsing code.
+- Fix the bug that fails to update `Group` Schema's ResourceOpts.
### Documentation
- Improve the description of the memory in observability doc.
- Update kubernetes install document to align the banyandb helm v0.3.0.
+- Add restrictions on updating schema.
### Chores
diff --git a/banyand/internal/storage/rotation.go
b/banyand/internal/storage/rotation.go
index ab18daec..2066ef90 100644
--- a/banyand/internal/storage/rotation.go
+++ b/banyand/internal/storage/rotation.go
@@ -43,7 +43,8 @@ func (d *database[T, O]) Tick(ts int64) {
}
func (d *database[T, O]) startRotationTask() error {
- rt := newRetentionTask(d, d.opts.TTL)
+ options := d.segmentController.getOptions()
+ rt := newRetentionTask(d, options.TTL)
go func(rt *retentionTask[T, O]) {
for ts := range d.tsEventCh {
func(ts int64) {
@@ -70,7 +71,7 @@ func (d *database[T, O]) startRotationTask() error {
}
d.incTotalRotationStarted(1)
defer d.incTotalRotationFinished(1)
- start :=
d.segmentController.opts.SegmentInterval.nextTime(t)
+ start :=
options.SegmentInterval.nextTime(t)
d.logger.Info().Time("segment_start",
start).Time("event_time", t).Msg("create new segment")
_, err :=
d.segmentController.create(start)
if err != nil {
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 02f59581..0451c649 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -32,6 +32,7 @@ import (
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -43,6 +44,8 @@ var ErrExpiredData = errors.New("expired data")
type segment[T TSTable, O any] struct {
metrics any
+ option O
+ creator TSTableCreator[T, O]
l *logger.Logger
index *seriesIndex
sLst atomic.Pointer[[]*shard[T]]
@@ -50,7 +53,6 @@ type segment[T TSTable, O any] struct {
timestamp.TimeRange
suffix string
location string
- opts TSDBOpts[T, O]
mu sync.Mutex
refCount int32
mustBeDeleted uint32
@@ -58,7 +60,6 @@ type segment[T TSTable, O any] struct {
}
func (sc *segmentController[T, O]) openSegment(ctx context.Context, startTime,
endTime time.Time, path, suffix string,
- opts TSDBOpts[T, O],
) (s *segment[T, O], err error) {
suffixInteger, err := strconv.Atoi(suffix)
if err != nil {
@@ -69,8 +70,9 @@ func (sc *segmentController[T, O]) openSegment(ctx
context.Context, startTime, e
ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
return p
})
- id := generateSegID(sc.opts.SegmentInterval.Unit, suffixInteger)
- sir, err := newSeriesIndex(ctx, path,
sc.opts.SeriesIndexFlushTimeoutSeconds, sc.indexMetrics)
+ options := sc.getOptions()
+ id := generateSegID(options.SegmentInterval.Unit, suffixInteger)
+ sir, err := newSeriesIndex(ctx, path,
options.SeriesIndexFlushTimeoutSeconds, sc.indexMetrics)
if err != nil {
return nil, errors.Wrap(errOpenDatabase,
errors.WithMessage(err, "create series index controller failed").Error())
}
@@ -82,20 +84,21 @@ func (sc *segmentController[T, O]) openSegment(ctx
context.Context, startTime, e
position: p,
refCount: 1,
index: sir,
- opts: opts,
metrics: sc.metrics,
+ creator: options.TSTableCreator,
+ option: options.Option,
}
s.l = logger.Fetch(ctx, s.String())
- return s, s.loadShards()
+ return s, s.loadShards(int(options.ShardNum))
}
-func (s *segment[T, O]) loadShards() error {
+func (s *segment[T, O]) loadShards(shardNum int) error {
return walkDir(s.location, shardPathPrefix, func(suffix string) error {
shardID, err := strconv.Atoi(suffix)
if err != nil {
return err
}
- if shardID >= int(s.opts.ShardNum) {
+ if shardID >= shardNum {
return nil
}
s.l.Info().Int("shard_id", shardID).Msg("loaded a existed
shard")
@@ -205,11 +208,12 @@ type segmentController[T TSTable, O any] struct {
metrics Metrics
l *logger.Logger
indexMetrics *inverted.Metrics
+ opts *TSDBOpts[T, O]
position common.Position
location string
lst []*segment[T, O]
- opts TSDBOpts[T, O]
deadline atomic.Int64
+ optsMutex sync.RWMutex
sync.RWMutex
}
@@ -219,7 +223,7 @@ func newSegmentController[T TSTable, O any](ctx
context.Context, location string
clock, _ := timestamp.GetClock(ctx)
return &segmentController[T, O]{
location: location,
- opts: opts,
+ opts: &opts,
l: l,
clock: clock,
position: common.GetPosition(ctx),
@@ -228,6 +232,25 @@ func newSegmentController[T TSTable, O any](ctx
context.Context, location string
}
}
+func (sc *segmentController[T, O]) getOptions() *TSDBOpts[T, O] {
+ sc.optsMutex.RLock()
+ defer sc.optsMutex.RUnlock()
+ return sc.opts
+}
+
+func (sc *segmentController[T, O]) updateOptions(resourceOpts
*commonv1.ResourceOpts) {
+ sc.optsMutex.Lock()
+ defer sc.optsMutex.Unlock()
+ si := MustToIntervalRule(resourceOpts.SegmentInterval)
+ if sc.opts.SegmentInterval.Unit != si.Unit {
+ sc.l.Panic().Msg("segment interval unit cannot be changed")
+ return
+ }
+ sc.opts.SegmentInterval = si
+ sc.opts.TTL = MustToIntervalRule(resourceOpts.Ttl)
+ sc.opts.ShardNum = resourceOpts.ShardNum
+}
+
func (sc *segmentController[T, O]) selectSegments(timeRange
timestamp.TimeRange) (tt []Segment[T, O]) {
sc.RLock()
defer sc.RUnlock()
@@ -270,7 +293,7 @@ func (sc *segmentController[T, O]) segments() (ss
[]*segment[T, O]) {
}
func (sc *segmentController[T, O]) format(tm time.Time) string {
- switch sc.opts.SegmentInterval.Unit {
+ switch sc.getOptions().SegmentInterval.Unit {
case HOUR:
return tm.Format(hourFormat)
case DAY:
@@ -280,7 +303,7 @@ func (sc *segmentController[T, O]) format(tm time.Time)
string {
}
func (sc *segmentController[T, O]) parse(value string) (time.Time, error) {
- switch sc.opts.SegmentInterval.Unit {
+ switch sc.getOptions().SegmentInterval.Unit {
case HOUR:
return time.ParseInLocation(hourFormat, value, time.Local)
case DAY:
@@ -293,7 +316,7 @@ func (sc *segmentController[T, O]) open() error {
sc.Lock()
defer sc.Unlock()
emptySegments := make([]string, 0)
- err := loadSegments(sc.location, segPathPrefix, sc,
sc.opts.SegmentInterval, func(start, end time.Time) error {
+ err := loadSegments(sc.location, segPathPrefix, sc,
sc.getOptions().SegmentInterval, func(start, end time.Time) error {
suffix := sc.format(start)
segmentPath := path.Join(sc.location, fmt.Sprintf(segTemplate,
suffix))
metadataPath := path.Join(segmentPath, metadataFilename)
@@ -334,7 +357,8 @@ func (sc *segmentController[T, O]) create(start time.Time)
(*segment[T, O], erro
return s, nil
}
}
- start = sc.opts.SegmentInterval.Unit.standard(start)
+ options := sc.getOptions()
+ start = options.SegmentInterval.Unit.standard(start)
var next *segment[T, O]
for _, s := range sc.lst {
if s.Contains(start.UnixNano()) {
@@ -344,7 +368,7 @@ func (sc *segmentController[T, O]) create(start time.Time)
(*segment[T, O], erro
next = s
}
}
- stdEnd := sc.opts.SegmentInterval.nextTime(start)
+ stdEnd := options.SegmentInterval.nextTime(start)
var end time.Time
if next != nil && next.Start.Before(stdEnd) {
end = next.Start
@@ -381,7 +405,7 @@ func (sc *segmentController[T, O]) load(start, end
time.Time, root string) (seg
ctx := common.SetPosition(context.WithValue(context.Background(),
logger.ContextKey, sc.l), func(_ common.Position) common.Position {
return sc.position
})
- seg, err = sc.openSegment(ctx, start, end, segPath, suffix, sc.opts)
+ seg, err = sc.openSegment(ctx, start, end, segPath, suffix)
if err != nil {
return nil, err
}
diff --git a/banyand/internal/storage/shard.go
b/banyand/internal/storage/shard.go
index 20c0807e..0c27f335 100644
--- a/banyand/internal/storage/shard.go
+++ b/banyand/internal/storage/shard.go
@@ -43,7 +43,7 @@ func (s *segment[T, O]) openShard(ctx context.Context, id
common.ShardID) (*shar
l.Info().Int("shard_id", int(id)).Str("path", location).Msg("creating a
shard")
p := common.GetPosition(ctx)
p.Shard = strconv.Itoa(int(id))
- t, err := s.opts.TSTableCreator(lfs, location, p, l, s.TimeRange,
s.opts.Option, s.metrics)
+ t, err := s.creator(lfs, location, p, l, s.TimeRange, s.option,
s.metrics)
if err != nil {
return nil, err
}
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index a25934b3..de058b0e 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -99,6 +99,7 @@ type TSDB[T TSTable, O any] interface {
CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error)
SelectSegments(timeRange timestamp.TimeRange) []Segment[T, O]
Tick(ts int64)
+ UpdateOptions(opts *commonv1.ResourceOpts)
}
// Segment is a time range of data.
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index f17dc7c2..5d62bc82 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -28,6 +28,7 @@ import (
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
@@ -79,13 +80,17 @@ type database[T TSTable, O any] struct {
*metrics
p common.Position
location string
- opts TSDBOpts[T, O]
latestTickTime atomic.Int64
sync.RWMutex
rotationProcessOn atomic.Bool
+ closed atomic.Bool
}
func (d *database[T, O]) Close() error {
+ if d.closed.Load() {
+ return nil
+ }
+ d.closed.Store(true)
d.Lock()
defer d.Unlock()
d.scheduler.Close()
@@ -122,7 +127,6 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts
TSDBOpts[T, O]) (TSDB[
location: location,
scheduler: scheduler,
logger: l,
- opts: opts,
tsEventCh: make(chan int64),
p: p,
segmentController: newSegmentController[T](ctx, location,
@@ -144,14 +148,30 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts
TSDBOpts[T, O]) (TSDB[
}
func (d *database[T, O]) CreateSegmentIfNotExist(ts time.Time) (Segment[T, O],
error) {
+ if d.closed.Load() {
+ return nil, errors.New("database is closed")
+ }
return d.segmentController.createSegment(ts)
}
func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange)
[]Segment[T, O] {
+ if d.closed.Load() {
+ return nil
+ }
return d.segmentController.selectSegments(timeRange)
}
+func (d *database[T, O]) UpdateOptions(resourceOpts *commonv1.ResourceOpts) {
+ if d.closed.Load() {
+ return
+ }
+ d.segmentController.updateOptions(resourceOpts)
+}
+
func (d *database[T, O]) collect() {
+ if d.closed.Load() {
+ return
+ }
if d.metrics == nil {
return
}
diff --git a/banyand/internal/storage/tsdb_test.go
b/banyand/internal/storage/tsdb_test.go
new file mode 100644
index 00000000..2b33a4f9
--- /dev/null
+++ b/banyand/internal/storage/tsdb_test.go
@@ -0,0 +1,164 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+func TestOpenTSDB(t *testing.T) {
+ logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })
+
+ t.Run("create new TSDB", func(t *testing.T) {
+ dir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ opts := TSDBOpts[*MockTSTable, any]{
+ Location: dir,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 3},
+ ShardNum: 1,
+ TSTableCreator: MockTSTableCreator,
+ }
+
+ ctx := context.Background()
+ mc := timestamp.NewMockClock()
+ ts, err := time.ParseInLocation("2006-01-02 15:04:05",
"2024-05-01 00:00:00", time.Local)
+ require.NoError(t, err)
+ mc.Set(ts)
+ ctx = timestamp.SetClock(ctx, mc)
+
+ tsdb, err := OpenTSDB(ctx, opts)
+ require.NoError(t, err)
+ require.NotNil(t, tsdb)
+
+ seg, err := tsdb.CreateSegmentIfNotExist(ts)
+ require.NoError(t, err)
+ defer seg.DecRef()
+
+ db := tsdb.(*database[*MockTSTable, any])
+ require.Equal(t, len(db.segmentController.segments()), 1)
+ tsdb.Close()
+ })
+
+ t.Run("reopen existing TSDB", func(t *testing.T) {
+ dir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ opts := TSDBOpts[*MockTSTable, any]{
+ Location: dir,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 3},
+ ShardNum: 1,
+ TSTableCreator: MockTSTableCreator,
+ }
+
+ ctx := context.Background()
+ mc := timestamp.NewMockClock()
+ ts, err := time.ParseInLocation("2006-01-02 15:04:05",
"2024-05-01 00:00:00", time.Local)
+ require.NoError(t, err)
+ mc.Set(ts)
+ ctx = timestamp.SetClock(ctx, mc)
+
+ // Create new TSDB
+ tsdb, err := OpenTSDB(ctx, opts)
+ require.NoError(t, err)
+ require.NotNil(t, tsdb)
+
+ seg, err := tsdb.CreateSegmentIfNotExist(ts)
+ require.NoError(t, err)
+ seg.DecRef()
+
+ db := tsdb.(*database[*MockTSTable, any])
+ segs := db.segmentController.segments()
+ require.Equal(t, len(segs), 1)
+ for i := range segs {
+ segs[i].DecRef()
+ }
+ tsdb.Close()
+
+ // Reopen existing TSDB
+ tsdb, err = OpenTSDB(ctx, opts)
+ require.NoError(t, err)
+ require.NotNil(t, tsdb)
+
+ db = tsdb.(*database[*MockTSTable, any])
+ segs = db.segmentController.segments()
+ require.Equal(t, len(segs), 1)
+ for i := range segs {
+ segs[i].DecRef()
+ }
+ tsdb.Close()
+ })
+
+ t.Run("Changed options", func(t *testing.T) {
+ dir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ opts := TSDBOpts[*MockTSTable, any]{
+ Location: dir,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 3},
+ ShardNum: 1,
+ TSTableCreator: MockTSTableCreator,
+ }
+
+ ctx := context.Background()
+ mc := timestamp.NewMockClock()
+ ts, err := time.ParseInLocation("2006-01-02 15:04:05",
"2024-05-01 00:00:00", time.Local)
+ require.NoError(t, err)
+ mc.Set(ts)
+ ctx = timestamp.SetClock(ctx, mc)
+
+ // Create new TSDB
+ tsdb, err := OpenTSDB(ctx, opts)
+ require.NoError(t, err)
+ require.NotNil(t, tsdb)
+
+ seg, err := tsdb.CreateSegmentIfNotExist(ts)
+ require.NoError(t, err)
+ seg.DecRef()
+
+ tsdb.UpdateOptions(&commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit: commonv1.IntervalRule_UNIT_DAY,
+ Num: 2,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit: commonv1.IntervalRule_UNIT_DAY,
+ Num: 6,
+ },
+ })
+
+ tsdb.Close()
+ })
+}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 83dbbc02..07de2db2 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -295,7 +295,7 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata)
(resourceSchema.Resourc
return s.metadata.MeasureRegistry().GetMeasure(ctx, md)
}
-func (s *supplier) OpenDB(groupSchema *commonv1.Group) (io.Closer, error) {
+func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB,
error) {
name := groupSchema.Metadata.Name
p := common.Position{
Module: "measure",
diff --git a/banyand/measure/metrics.go b/banyand/measure/metrics.go
index 7bbeea42..96e8ff22 100644
--- a/banyand/measure/metrics.go
+++ b/banyand/measure/metrics.go
@@ -293,6 +293,9 @@ func (tst *tsTable) Collect(m storage.Metrics) {
}
metrics := m.(*metrics)
snp := tst.currentSnapshot()
+ if snp == nil {
+ return
+ }
defer snp.decRef()
var totalMemPart, totalMemElements, totalMemBlocks, totalMemPartBytes,
totalMemPartUncompressedBytes uint64
diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go
index 7040bb2b..d8b9d627 100644
--- a/banyand/metadata/schema/group.go
+++ b/banyand/metadata/schema/group.go
@@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
clientv3 "go.etcd.io/etcd/client/v3"
+ "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -102,7 +103,17 @@ func (e *etcdSchemaRegistry) UpdateGroup(ctx
context.Context, group *commonv1.Gr
if group.Metadata.Name == "" {
return errors.New("metadata.name is required")
}
- _, err := e.update(ctx, Metadata{
+ g, err := e.GetGroup(ctx, group.GetMetadata().GetName())
+ if err != nil {
+ return err
+ }
+ if proto.Equal(g.ResourceOpts, group.ResourceOpts) {
+ return nil
+ }
+ if g.GetResourceOpts().SegmentInterval.Unit !=
group.GetResourceOpts().SegmentInterval.Unit {
+ return errors.New("segment interval unit cannot be changed")
+ }
+ _, err = e.update(ctx, Metadata{
TypeMeta: TypeMeta{
Kind: KindGroup,
Name: group.GetMetadata().GetName(),
diff --git a/banyand/metadata/schema/measure.go
b/banyand/metadata/schema/measure.go
index d4461cfb..45c20f31 100644
--- a/banyand/metadata/schema/measure.go
+++ b/banyand/metadata/schema/measure.go
@@ -19,6 +19,7 @@ package schema
import (
"context"
+ "fmt"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -114,10 +115,8 @@ func (e *etcdSchemaRegistry) UpdateMeasure(ctx
context.Context, measure *databas
if prev == nil {
return 0, errors.WithMessagef(ErrGRPCResourceNotFound, "measure
%s not found", measure.GetMetadata().GetName())
}
- for i, e := range prev.GetEntity().GetTagNames() {
- if e != measure.GetEntity().GetTagNames()[i] {
- return 0, errors.WithMessagef(ErrInputInvalid, "entity
is immutable. Please create a new measure if you want to change entity")
- }
+ if err := validateEqualExceptAppendTagsAndFields(prev, measure); err !=
nil {
+ return 0, errors.WithMessagef(ErrInputInvalid, "validation
failed: %s", err)
}
return e.update(ctx, Metadata{
TypeMeta: TypeMeta{
@@ -130,6 +129,43 @@ func (e *etcdSchemaRegistry) UpdateMeasure(ctx
context.Context, measure *databas
})
}
+func validateEqualExceptAppendTagsAndFields(prevMeasure, newMeasure
*databasev1.Measure) error {
+ if prevMeasure.GetInterval() != newMeasure.GetInterval() {
+ return fmt.Errorf("interval is different: %s != %s",
prevMeasure.GetInterval(), newMeasure.GetInterval())
+ }
+ if prevMeasure.GetEntity().String() != newMeasure.GetEntity().String() {
+ return fmt.Errorf("entity is different: %s != %s",
prevMeasure.GetEntity().String(), newMeasure.GetEntity().String())
+ }
+ if prevMeasure.GetIndexMode() != newMeasure.GetIndexMode() {
+ return fmt.Errorf("index mode is different: %v != %v",
prevMeasure.GetIndexMode(), newMeasure.GetIndexMode())
+ }
+ if len(prevMeasure.GetTagFamilies()) > len(newMeasure.GetTagFamilies())
{
+ return fmt.Errorf("number of tag families is less in the new
measure")
+ }
+ if len(prevMeasure.GetFields()) > len(newMeasure.GetFields()) {
+ return fmt.Errorf("number of fields is less in the new measure")
+ }
+ for i, tagFamily := range prevMeasure.GetTagFamilies() {
+ if tagFamily.Name != newMeasure.GetTagFamilies()[i].Name {
+ return fmt.Errorf("tag family name is different: %s !=
%s", tagFamily.Name, newMeasure.GetTagFamilies()[i].Name)
+ }
+ if len(tagFamily.Tags) >
len(newMeasure.GetTagFamilies()[i].Tags) {
+ return fmt.Errorf("number of tags in tag family %s is
less in the new measure", tagFamily.Name)
+ }
+ for j, tag := range tagFamily.Tags {
+ if tag.String() !=
newMeasure.GetTagFamilies()[i].Tags[j].String() {
+ return fmt.Errorf("tag %s in tag family %s is
different: %s != %s", tag.Name, tagFamily.Name, tag.String(),
newMeasure.GetTagFamilies()[i].Tags[j].String())
+ }
+ }
+ }
+ for i, field := range prevMeasure.GetFields() {
+ if field.String() != newMeasure.GetFields()[i].String() {
+ return fmt.Errorf("field is different: %s != %s",
field.String(), newMeasure.GetFields()[i].String())
+ }
+ }
+ return nil
+}
+
func (e *etcdSchemaRegistry) DeleteMeasure(ctx context.Context, metadata
*commonv1.Metadata) (bool, error) {
return e.delete(ctx, Metadata{
TypeMeta: TypeMeta{
diff --git a/banyand/metadata/schema/stream.go
b/banyand/metadata/schema/stream.go
index 31fdef77..502cb42c 100644
--- a/banyand/metadata/schema/stream.go
+++ b/banyand/metadata/schema/stream.go
@@ -19,6 +19,7 @@ package schema
import (
"context"
+ "fmt"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -68,20 +69,15 @@ func (e *etcdSchemaRegistry) UpdateStream(ctx
context.Context, stream *databasev
if err = validate.GroupForStreamOrMeasure(g); err != nil {
return 0, err
}
- if err = validate.GroupForStreamOrMeasure(g); err != nil {
- return 0, err
- }
prev, err := e.GetStream(ctx, stream.GetMetadata())
if err != nil {
return 0, err
}
if prev == nil {
- return 0, errors.WithMessagef(ErrGRPCResourceNotFound, "measure
%s not found", stream.GetMetadata().GetName())
+ return 0, errors.WithMessagef(ErrGRPCResourceNotFound, "stream
%s not found", stream.GetMetadata().GetName())
}
- for i, e := range prev.GetEntity().GetTagNames() {
- if e != stream.GetEntity().GetTagNames()[i] {
- return 0, errors.WithMessagef(ErrInputInvalid, "entity
is immutable. Please create a new stream if you want to change entity")
- }
+ if err := validateEqualExceptAppendTags(prev, stream); err != nil {
+ return 0, errors.WithMessagef(ErrInputInvalid, "validation
failed: %s", err)
}
return e.update(ctx, Metadata{
TypeMeta: TypeMeta{
@@ -94,6 +90,29 @@ func (e *etcdSchemaRegistry) UpdateStream(ctx
context.Context, stream *databasev
})
}
+func validateEqualExceptAppendTags(prevStream, newStream *databasev1.Stream)
error {
+ if prevStream.GetEntity().String() != newStream.GetEntity().String() {
+ return fmt.Errorf("entity is different: %s != %s",
prevStream.GetEntity().String(), newStream.GetEntity().String())
+ }
+ if len(prevStream.GetTagFamilies()) > len(newStream.GetTagFamilies()) {
+ return fmt.Errorf("number of tag families is less in the new
stream")
+ }
+ for i, tagFamily := range prevStream.GetTagFamilies() {
+ if tagFamily.Name != newStream.GetTagFamilies()[i].Name {
+ return fmt.Errorf("tag family name is different: %s !=
%s", tagFamily.Name, newStream.GetTagFamilies()[i].Name)
+ }
+ if len(tagFamily.Tags) >
len(newStream.GetTagFamilies()[i].Tags) {
+ return fmt.Errorf("number of tags in tag family %s is
less in the new stream", tagFamily.Name)
+ }
+ for j, tag := range tagFamily.Tags {
+ if tag.String() !=
newStream.GetTagFamilies()[i].Tags[j].String() {
+ return fmt.Errorf("tag %s in tag family %s is
different: %s != %s", tag.Name, tagFamily.Name, tag.String(),
newStream.GetTagFamilies()[i].Tags[j].String())
+ }
+ }
+ }
+ return nil
+}
+
func (e *etcdSchemaRegistry) CreateStream(ctx context.Context, stream
*databasev1.Stream) (int64, error) {
if stream.UpdatedAt != nil {
stream.UpdatedAt = timestamppb.Now()
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 1103b3d5..0c6a1c43 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -279,7 +279,7 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata)
(resourceSchema.Resourc
return s.metadata.StreamRegistry().GetStream(ctx, md)
}
-func (s *supplier) OpenDB(groupSchema *commonv1.Group) (io.Closer, error) {
+func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB,
error) {
name := groupSchema.Metadata.Name
p := common.Position{
Module: "stream",
diff --git a/banyand/stream/metrics.go b/banyand/stream/metrics.go
index f77bd41b..339f3a2c 100644
--- a/banyand/stream/metrics.go
+++ b/banyand/stream/metrics.go
@@ -293,6 +293,10 @@ func (tst *tsTable) Collect(m storage.Metrics) {
}
metrics := m.(*metrics)
snp := tst.currentSnapshot()
+ if snp == nil {
+ return
+ }
+
defer snp.decRef()
var totalMemPart, totalMemElements, totalMemBlocks, totalMemPartBytes,
totalMemPartUncompressedBytes uint64
diff --git a/docs/interacting/bydbctl/schema/group.md
b/docs/interacting/bydbctl/schema/group.md
index e4f9c51c..c1e17e14 100644
--- a/docs/interacting/bydbctl/schema/group.md
+++ b/docs/interacting/bydbctl/schema/group.md
@@ -66,6 +66,8 @@ resource_opts:
EOF
```
+You can't change the unit of `segment_interval`. If you want to change the
unit, you should delete the group and create a new one.
+
## Delete operation
Delete operation deletes a group's schema.
diff --git a/docs/interacting/bydbctl/schema/measure.md
b/docs/interacting/bydbctl/schema/measure.md
index d8f8b3e7..7faae432 100644
--- a/docs/interacting/bydbctl/schema/measure.md
+++ b/docs/interacting/bydbctl/schema/measure.md
@@ -87,18 +87,32 @@ bydbctl measure update -f - <<EOF
metadata:
name: service_cpm_minute
group: sw_metric
-tagFamilies:
- - name: searchable
- tags:
- - name: trace_id
- type: TAG_TYPE_STRING
+tag_families:
+- name: default
+ tags:
+ - name: id
+ type: TAG_TYPE_STRING
+ - name: entity_id
+ type: TAG_TYPE_STRING
+ - name: new_tag
+ type: TAG_TYPE_STRING
+fields:
+- name: total
+ field_type: FIELD_TYPE_INT
+ encoding_method: ENCODING_METHOD_GORILLA
+ compression_method: COMPRESSION_METHOD_ZSTD
+- name: value
+ field_type: FIELD_TYPE_INT
+ encoding_method: ENCODING_METHOD_GORILLA
+ compression_method: COMPRESSION_METHOD_ZSTD
entity:
tag_names:
- - entity_id
+ - entity_id
+interval: 1m
EOF
```
-A measure's `entity` is immutable. If you want to change the entity field, you
should delete the measure and create a new one.
+You only can append new tags or fields to a measure. You can't change the
existing tags or fields. The order of tags and fields is immutable, you can't
change the order of tags or fields. You can't insert a new tag or field in the
middle or front of the existing tags or fields.
## Delete operation
diff --git a/docs/interacting/bydbctl/schema/stream.md
b/docs/interacting/bydbctl/schema/stream.md
index 1e18195a..3e2668b5 100644
--- a/docs/interacting/bydbctl/schema/stream.md
+++ b/docs/interacting/bydbctl/schema/stream.md
@@ -80,14 +80,16 @@ tagFamilies:
tags:
- name: trace_id
type: TAG_TYPE_STRING
+ - name: trace_name
+ type: TAG_TYPE_STRING
entity:
- tagNames:
+ tagNames:
- stream_id
EOF
```
-A stream's `entity` field is immutable. If you want to change the entity
field, you should delete the stream and create a new one.
+You only can append new tags to a measure. You can't change the existing tags
. The order of tags is immutable, you can't change the order of tags. You can't
insert a new tag in the middle or front of the existing tags.
## Delete operation
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index 7fcd31f4..3b7e34ac 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -28,6 +28,7 @@ import (
"github.com/pkg/errors"
"go.uber.org/multierr"
+ "google.golang.org/protobuf/proto"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
@@ -270,15 +271,12 @@ func (sr *schemaRepo) storeGroup(groupMeta
*commonv1.Metadata) (*group, error) {
if groupSchema.GetMetadata().GetModRevision() <=
prevGroupSchema.Metadata.ModRevision {
return g, nil
}
- sr.l.Info().Str("group", name).Msg("closing the previous tsdb")
- db := g.SupplyTSDB()
- if db != nil {
- db.Close()
- }
- sr.l.Info().Str("group", name).Msg("creating a new tsdb")
- if err := g.init(name); err != nil {
- return nil, err
+ g.groupSchema.Store(groupSchema)
+ if proto.Equal(groupSchema, prevGroupSchema) {
+ return g, nil
}
+ sr.l.Info().Str("group", name).Msg("updating the group resource
options")
+ g.db.Load().(DB).UpdateOptions(groupSchema.ResourceOpts)
return g, nil
}
@@ -500,7 +498,7 @@ func (g *group) initBySchema(groupSchema *commonv1.Group)
error {
return err
}
g.db.Store(db)
- return nil
+ return err
}
func (g *group) isInit() bool {
@@ -512,10 +510,7 @@ func (g *group) GetSchema() *commonv1.Group {
}
func (g *group) SupplyTSDB() io.Closer {
- if v := g.db.Load(); v != nil {
- return v.(io.Closer)
- }
- return nil
+ return g.db.Load().(io.Closer)
}
func (g *group) isPortable() bool {
@@ -526,11 +521,7 @@ func (g *group) close() (err error) {
if !g.isInit() || g.isPortable() {
return nil
}
- db := g.SupplyTSDB()
- if db != nil {
- err = multierr.Append(err, db.Close())
- }
- return err
+ return multierr.Append(err, g.SupplyTSDB().Close())
}
func parseMaxModRevision[T ResourceSchema](indexRules []T)
(maxRevisionForIdxRules int64) {
diff --git a/pkg/schema/schema.go b/pkg/schema/schema.go
index 6ee3c2d0..9415539d 100644
--- a/pkg/schema/schema.go
+++ b/pkg/schema/schema.go
@@ -90,7 +90,13 @@ type ResourceSchemaSupplier interface {
// ResourceSupplier allows open a resource and its embedded tsdb.
type ResourceSupplier interface {
ResourceSchemaSupplier
- OpenDB(groupSchema *commonv1.Group) (io.Closer, error)
+ OpenDB(groupSchema *commonv1.Group) (DB, error)
+}
+
+// DB is the interface of a tsdb.
+type DB interface {
+ io.Closer
+ UpdateOptions(opts *commonv1.ResourceOpts)
}
// Repository is the collection of several hierarchies groups by a "Group".