This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch delete-with-merge
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 4a0df12a43a866c3f9eb092aa744e65063b2af61
Author: mrproliu <741550...@qq.com>
AuthorDate: Tue Jun 17 22:28:20 2025 +0800

    Physics delete property when merging segment
---
 banyand/property/shard.go      | 41 +++++++++++++++++--
 banyand/property/shard_test.go | 90 ++++++++++++++++++++++++++++++++++++++++++
 go.mod                         |  4 +-
 go.sum                         |  8 ++--
 pkg/index/inverted/inverted.go |  9 ++++-
 5 files changed, 140 insertions(+), 12 deletions(-)

diff --git a/banyand/property/shard.go b/banyand/property/shard.go
index 7a4bef78..fc13685c 100644
--- a/banyand/property/shard.go
+++ b/banyand/property/shard.go
@@ -24,6 +24,8 @@ import (
        "strconv"
        "sync"
 
+       "github.com/RoaringBitmap/roaring"
+       segment "github.com/blugelabs/bluge_segment_api"
        "google.golang.org/protobuf/encoding/protojson"
 
        "github.com/apache/skywalking-banyandb/api/common"
@@ -78,10 +80,11 @@ func (db *database) newShard(ctx context.Context, id 
common.ShardID, _ int64) (*
                location: location,
        }
        opts := inverted.StoreOpts{
-               Path:         location,
-               Logger:       si.l,
-               Metrics:      
inverted.NewMetrics(db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"shard":
 sName}))),
-               BatchWaitSec: 0,
+               Path:                 location,
+               Logger:               si.l,
+               Metrics:              
inverted.NewMetrics(db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"shard":
 sName}))),
+               BatchWaitSec:         0,
+               PrepareMergeCallback: si.prepareForMerge,
        }
        var err error
        if si.store, err = inverted.NewStore(opts); err != nil {
@@ -225,3 +228,33 @@ func (s *shard) search(ctx context.Context, indexQuery 
index.Query, limit int,
        }
        return data, nil
 }
+
+func (s *shard) prepareForMerge(segments []segment.Segment, drops 
[]*roaring.Bitmap, _ uint64) error {
+       if len(segments) == 0 || len(drops) == 0 {
+               return nil
+       }
+       for segID, seg := range segments {
+               var docID uint64
+               for ; docID < seg.Count(); docID++ {
+                       hasDeleted := false
+                       _ = seg.VisitStoredFields(docID, func(field string, 
value []byte) bool {
+                               if field == deleteField {
+                                       hasDeleted = convert.BytesToBool(value)
+                                       return false
+                               }
+                               return true
+                       })
+
+                       if !hasDeleted {
+                               continue
+                       }
+
+                       if drops[segID] == nil {
+                               drops[segID] = roaring.New()
+                       }
+
+                       drops[segID].Add(uint32(docID))
+               }
+       }
+       return nil
+}
diff --git a/banyand/property/shard_test.go b/banyand/property/shard_test.go
new file mode 100644
index 00000000..cf6aab02
--- /dev/null
+++ b/banyand/property/shard_test.go
@@ -0,0 +1,90 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestMergeDeleted(t *testing.T) {
+       var defers []func()
+       defer func() {
+               for _, f := range defers {
+                       f()
+               }
+       }()
+
+       dir, deferFunc, err := test.NewSpace()
+       if err != nil {
+               t.Fatal(err)
+       }
+       defers = append(defers, deferFunc)
+       db, err := openDB(context.Background(), dir, 3*time.Second, 
observability.BypassRegistry, fs.NewLocalFileSystem())
+       if err != nil {
+               t.Fatal(err)
+       }
+       defers = append(defers, func() {
+               _ = db.close()
+       })
+
+       newShard, err := db.loadShard(context.Background(), 0)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       property := &propertyv1.Property{
+               Metadata: &commonv1.Metadata{
+                       Group:       "test-group",
+                       Name:        "test-name",
+                       ModRevision: time.Now().Unix(),
+               },
+               Id: "test-id",
+               Tags: []*modelv1.Tag{
+                       {Key: "tag1", Value: &modelv1.TagValue{Value: 
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: 1}}}},
+               },
+       }
+
+       // apply the property
+       if err = newShard.update(GetPropertyID(property), property); err != nil 
{
+               t.Fatal(err)
+       }
+
+       // delete current property
+       if err = newShard.delete(context.Background(), 
[][]byte{GetPropertyID(property)}); err != nil {
+               t.Fatal(err)
+       }
+
+       // check if the property is deleted from shard including deleted, 
should be no document (delete by merge phase)
+       resp, err := db.query(context.Background(), 
&propertyv1.QueryRequest{Groups: []string{"test-group"}})
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       if len(resp) != 0 {
+               t.Fail()
+       }
+}
diff --git a/go.mod b/go.mod
index bff48b0c..f4005bf7 100644
--- a/go.mod
+++ b/go.mod
@@ -197,7 +197,7 @@ require (
 
 replace (
        github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock 
v1.3.1-0.20220809233656-dc7607c94a97
-       github.com/blugelabs/bluge => github.com/SkyAPM/bluge 
v0.0.0-20241225104157-e54f64be56e8
+       github.com/blugelabs/bluge => github.com/mrproliu/bluge 
v0.0.0-20250617134539-8eb006907c39
        github.com/blugelabs/bluge_segment_api => 
github.com/zinclabs/bluge_segment_api v1.0.0
-       github.com/blugelabs/ice => github.com/SkyAPM/ice 
v0.0.0-20241108011032-c3d8eea75118
+       github.com/blugelabs/ice => github.com/mrproliu/ice 
v0.0.0-20250617134843-17221f751a9a
 )
diff --git a/go.sum b/go.sum
index 4662c1e1..c146b1c2 100644
--- a/go.sum
+++ b/go.sum
@@ -15,12 +15,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod 
h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
 github.com/RoaringBitmap/roaring v0.9.4/go.mod 
h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
 github.com/RoaringBitmap/roaring v1.9.4 
h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ=
 github.com/RoaringBitmap/roaring v1.9.4/go.mod 
h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
-github.com/SkyAPM/bluge v0.0.0-20241225104157-e54f64be56e8 
h1:bWJ8yNeaph7oe4S/kvoAM0dNLNamfsOzxaajhLvTAks=
-github.com/SkyAPM/bluge v0.0.0-20241225104157-e54f64be56e8/go.mod 
h1:rriyHHsTidJ4UYFiVDIZyXCFJxQYs5FGpZmIfuvSqPA=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 
h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod 
h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44=
-github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118 
h1:Ja62sgOCp2qPTd8Xmldv1U83v11IRIsh6KlB7UaFLj4=
-github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118/go.mod 
h1:DoQeb0Ee86LyruZSL77Ddscfk/THJ38x453CRCnGEPI=
 github.com/VictoriaMetrics/fastcache v1.12.2 
h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI=
 github.com/VictoriaMetrics/fastcache v1.12.2/go.mod 
h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI=
 github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 
h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
@@ -288,6 +284,10 @@ github.com/modern-go/reflect2 v1.0.2 
h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
 github.com/modern-go/reflect2 v1.0.2/go.mod 
h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
 github.com/montanaflynn/stats v0.7.1 
h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
 github.com/montanaflynn/stats v0.7.1/go.mod 
h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
+github.com/mrproliu/bluge v0.0.0-20250617134539-8eb006907c39 
h1:Ee2TEbl4Ytfb7LF/nzD1OCf1JiA6gn4ZJJRk3MOhA2k=
+github.com/mrproliu/bluge v0.0.0-20250617134539-8eb006907c39/go.mod 
h1:rriyHHsTidJ4UYFiVDIZyXCFJxQYs5FGpZmIfuvSqPA=
+github.com/mrproliu/ice v0.0.0-20250617134843-17221f751a9a 
h1:e8NmmThNduLFLIN391aAJgv44A3hfEoYRvwVDye4fUM=
+github.com/mrproliu/ice v0.0.0-20250617134843-17221f751a9a/go.mod 
h1:DoQeb0Ee86LyruZSL77Ddscfk/THJ38x453CRCnGEPI=
 github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
 github.com/mschoch/smat v0.2.0/go.mod 
h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 
h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 8450e647..72c5d122 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -25,12 +25,14 @@ import (
        "strconv"
        "time"
 
+       roaringpkg "github.com/RoaringBitmap/roaring"
        "github.com/blugelabs/bluge"
        "github.com/blugelabs/bluge/analysis"
        "github.com/blugelabs/bluge/analysis/analyzer"
        blugeIndex "github.com/blugelabs/bluge/index"
        "github.com/blugelabs/bluge/numeric"
        "github.com/blugelabs/bluge/search"
+       segment "github.com/blugelabs/bluge_segment_api"
        "github.com/pkg/errors"
        "go.uber.org/multierr"
 
@@ -76,8 +78,10 @@ var _ index.Store = (*store)(nil)
 
 // StoreOpts wraps options to create an inverted index repository.
 type StoreOpts struct {
-       Logger        *logger.Logger
-       Metrics       *Metrics
+       Logger               *logger.Logger
+       Metrics              *Metrics
+       PrepareMergeCallback func(segments []segment.Segment, drops 
[]*roaringpkg.Bitmap, id uint64) error
+
        Path          string
        BatchWaitSec  int64
        CacheMaxBytes int
@@ -164,6 +168,7 @@ func NewStore(opts StoreOpts) (index.SeriesStore, error) {
        config := bluge.DefaultConfigWithIndexConfig(indexConfig)
        config.DefaultSearchAnalyzer = Analyzers[index.AnalyzerKeyword]
        config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0)
+       config = config.WithPrepareMergeCallback(opts.PrepareMergeCallback)
        w, err := bluge.OpenWriter(config)
        if err != nil {
                return nil, err

Reply via email to