This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch delete-with-merge in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 4a0df12a43a866c3f9eb092aa744e65063b2af61 Author: mrproliu <741550...@qq.com> AuthorDate: Tue Jun 17 22:28:20 2025 +0800 Physics delete property when merging segment --- banyand/property/shard.go | 41 +++++++++++++++++-- banyand/property/shard_test.go | 90 ++++++++++++++++++++++++++++++++++++++++++ go.mod | 4 +- go.sum | 8 ++-- pkg/index/inverted/inverted.go | 9 ++++- 5 files changed, 140 insertions(+), 12 deletions(-) diff --git a/banyand/property/shard.go b/banyand/property/shard.go index 7a4bef78..fc13685c 100644 --- a/banyand/property/shard.go +++ b/banyand/property/shard.go @@ -24,6 +24,8 @@ import ( "strconv" "sync" + "github.com/RoaringBitmap/roaring" + segment "github.com/blugelabs/bluge_segment_api" "google.golang.org/protobuf/encoding/protojson" "github.com/apache/skywalking-banyandb/api/common" @@ -78,10 +80,11 @@ func (db *database) newShard(ctx context.Context, id common.ShardID, _ int64) (* location: location, } opts := inverted.StoreOpts{ - Path: location, - Logger: si.l, - Metrics: inverted.NewMetrics(db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"shard": sName}))), - BatchWaitSec: 0, + Path: location, + Logger: si.l, + Metrics: inverted.NewMetrics(db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"shard": sName}))), + BatchWaitSec: 0, + PrepareMergeCallback: si.prepareForMerge, } var err error if si.store, err = inverted.NewStore(opts); err != nil { @@ -225,3 +228,33 @@ func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, } return data, nil } + +func (s *shard) prepareForMerge(segments []segment.Segment, drops []*roaring.Bitmap, _ uint64) error { + if len(segments) == 0 || len(drops) == 0 { + return nil + } + for segID, seg := range segments { + var docID uint64 + for ; docID < seg.Count(); docID++ { + hasDeleted := false + _ = seg.VisitStoredFields(docID, func(field string, value []byte) bool { + if field == deleteField { + hasDeleted = convert.BytesToBool(value) + return false + } + return true + }) + + if !hasDeleted { + continue + } + + if drops[segID] == nil { + drops[segID] = roaring.New() + } + + drops[segID].Add(uint32(docID)) + } + } + return nil +} diff --git a/banyand/property/shard_test.go b/banyand/property/shard_test.go new file mode 100644 index 00000000..cf6aab02 --- /dev/null +++ b/banyand/property/shard_test.go @@ -0,0 +1,90 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "testing" + "time" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/test" +) + +func TestMergeDeleted(t *testing.T) { + var defers []func() + defer func() { + for _, f := range defers { + f() + } + }() + + dir, deferFunc, err := test.NewSpace() + if err != nil { + t.Fatal(err) + } + defers = append(defers, deferFunc) + db, err := openDB(context.Background(), dir, 3*time.Second, observability.BypassRegistry, fs.NewLocalFileSystem()) + if err != nil { + t.Fatal(err) + } + defers = append(defers, func() { + _ = db.close() + }) + + newShard, err := db.loadShard(context.Background(), 0) + if err != nil { + t.Fatal(err) + } + + property := &propertyv1.Property{ + Metadata: &commonv1.Metadata{ + Group: "test-group", + Name: "test-name", + ModRevision: time.Now().Unix(), + }, + Id: "test-id", + Tags: []*modelv1.Tag{ + {Key: "tag1", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 1}}}}, + }, + } + + // apply the property + if err = newShard.update(GetPropertyID(property), property); err != nil { + t.Fatal(err) + } + + // delete current property + if err = newShard.delete(context.Background(), [][]byte{GetPropertyID(property)}); err != nil { + t.Fatal(err) + } + + // check if the property is deleted from shard including deleted, should be no document (delete by merge phase) + resp, err := db.query(context.Background(), &propertyv1.QueryRequest{Groups: []string{"test-group"}}) + if err != nil { + t.Fatal(err) + } + + if len(resp) != 0 { + t.Fail() + } +} diff --git a/go.mod b/go.mod index bff48b0c..f4005bf7 100644 --- a/go.mod +++ b/go.mod @@ -197,7 +197,7 @@ require ( replace ( github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 - github.com/blugelabs/bluge => github.com/SkyAPM/bluge v0.0.0-20241225104157-e54f64be56e8 + github.com/blugelabs/bluge => github.com/mrproliu/bluge v0.0.0-20250617134539-8eb006907c39 github.com/blugelabs/bluge_segment_api => github.com/zinclabs/bluge_segment_api v1.0.0 - github.com/blugelabs/ice => github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118 + github.com/blugelabs/ice => github.com/mrproliu/ice v0.0.0-20250617134843-17221f751a9a ) diff --git a/go.sum b/go.sum index 4662c1e1..c146b1c2 100644 --- a/go.sum +++ b/go.sum @@ -15,12 +15,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ= github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= -github.com/SkyAPM/bluge v0.0.0-20241225104157-e54f64be56e8 h1:bWJ8yNeaph7oe4S/kvoAM0dNLNamfsOzxaajhLvTAks= -github.com/SkyAPM/bluge v0.0.0-20241225104157-e54f64be56e8/go.mod h1:rriyHHsTidJ4UYFiVDIZyXCFJxQYs5FGpZmIfuvSqPA= github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s= github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44= -github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118 h1:Ja62sgOCp2qPTd8Xmldv1U83v11IRIsh6KlB7UaFLj4= -github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118/go.mod h1:DoQeb0Ee86LyruZSL77Ddscfk/THJ38x453CRCnGEPI= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= @@ -288,6 +284,10 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/mrproliu/bluge v0.0.0-20250617134539-8eb006907c39 h1:Ee2TEbl4Ytfb7LF/nzD1OCf1JiA6gn4ZJJRk3MOhA2k= +github.com/mrproliu/bluge v0.0.0-20250617134539-8eb006907c39/go.mod h1:rriyHHsTidJ4UYFiVDIZyXCFJxQYs5FGpZmIfuvSqPA= +github.com/mrproliu/ice v0.0.0-20250617134843-17221f751a9a h1:e8NmmThNduLFLIN391aAJgv44A3hfEoYRvwVDye4fUM= +github.com/mrproliu/ice v0.0.0-20250617134843-17221f751a9a/go.mod h1:DoQeb0Ee86LyruZSL77Ddscfk/THJ38x453CRCnGEPI= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 8450e647..72c5d122 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -25,12 +25,14 @@ import ( "strconv" "time" + roaringpkg "github.com/RoaringBitmap/roaring" "github.com/blugelabs/bluge" "github.com/blugelabs/bluge/analysis" "github.com/blugelabs/bluge/analysis/analyzer" blugeIndex "github.com/blugelabs/bluge/index" "github.com/blugelabs/bluge/numeric" "github.com/blugelabs/bluge/search" + segment "github.com/blugelabs/bluge_segment_api" "github.com/pkg/errors" "go.uber.org/multierr" @@ -76,8 +78,10 @@ var _ index.Store = (*store)(nil) // StoreOpts wraps options to create an inverted index repository. type StoreOpts struct { - Logger *logger.Logger - Metrics *Metrics + Logger *logger.Logger + Metrics *Metrics + PrepareMergeCallback func(segments []segment.Segment, drops []*roaringpkg.Bitmap, id uint64) error + Path string BatchWaitSec int64 CacheMaxBytes int @@ -164,6 +168,7 @@ func NewStore(opts StoreOpts) (index.SeriesStore, error) { config := bluge.DefaultConfigWithIndexConfig(indexConfig) config.DefaultSearchAnalyzer = Analyzers[index.AnalyzerKeyword] config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0) + config = config.WithPrepareMergeCallback(opts.PrepareMergeCallback) w, err := bluge.OpenWriter(config) if err != nil { return nil, err