This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new c3667f77 Physics delete property when merging segment (#684)
c3667f77 is described below

commit c3667f774c4f92e21ee9eac8b78321a3808c57a9
Author: mrproliu <741550...@qq.com>
AuthorDate: Fri Jun 20 13:20:09 2025 +0800

    Physics delete property when merging segment (#684)
---
 api/proto/banyandb/property/v1/rpc.proto |   5 +-
 banyand/liaison/grpc/discovery.go        |   6 +-
 banyand/liaison/grpc/property.go         |  20 ++--
 banyand/property/db.go                   |  18 +++-
 banyand/property/listener.go             |   2 +-
 banyand/property/service.go              |   4 +-
 banyand/property/shard.go                |  72 ++++++++++++---
 banyand/property/shard_test.go           | 153 +++++++++++++++++++++++++++++++
 dist/LICENSE                             |   4 +-
 docs/api-reference.md                    |   2 +-
 go.mod                                   |   4 +-
 go.sum                                   |   8 +-
 pkg/index/index.go                       |   2 +-
 pkg/index/inverted/inverted.go           |  13 ++-
 pkg/index/inverted/inverted_series.go    |   4 +-
 15 files changed, 267 insertions(+), 50 deletions(-)

diff --git a/api/proto/banyandb/property/v1/rpc.proto 
b/api/proto/banyandb/property/v1/rpc.proto
index 4ea60cef..ba826b2c 100644
--- a/api/proto/banyandb/property/v1/rpc.proto
+++ b/api/proto/banyandb/property/v1/rpc.proto
@@ -122,7 +122,8 @@ message InternalDeleteRequest {
 message InternalQueryResponse {
   repeated bytes sources = 1;
   common.v1.Trace trace = 2;
-  // deletes indicates the property is deleted or not
+  // deletes indicates the property is deleted timestamps,
   // it's mapping to the sources in the same order
-  repeated bool deletes = 3;
+  // if the value is 0, it means the property is not deleted
+  repeated int64 deletes = 3;
 }
diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index eed1807b..f9542a70 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -146,7 +146,7 @@ func (s *groupRepo) OnDelete(schemaMetadata 
schema.Metadata) {
                return
        }
        if le := s.log.Debug(); le.Enabled() {
-               le.Stringer("id", group.Metadata).Msg("shard deleted")
+               le.Stringer("id", group.Metadata).Msg("shard deletedTime")
        }
        s.RWMutex.Lock()
        defer s.RWMutex.Unlock()
@@ -263,7 +263,7 @@ func (e *entityRepo) OnDelete(schemaMetadata 
schema.Metadata) {
                        Str("action", "delete").
                        Stringer("subject", id).
                        Str("kind", kind).
-                       Msg("entity deleted")
+                       Msg("entity deletedTime")
        }
        e.RWMutex.Lock()
        defer e.RWMutex.Unlock()
@@ -339,7 +339,7 @@ func (s *shardingKeyRepo) OnDelete(schemaMetadata 
schema.Metadata) {
                        Str("action", "delete").
                        Stringer("subject", id).
                        Str("kind", "measure").
-                       Msg("sharding key deleted")
+                       Msg("sharding key deletedTime")
        }
        s.RWMutex.Lock()
        defer s.RWMutex.Unlock()
diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go
index 61c4808a..d65c3728 100644
--- a/banyand/liaison/grpc/property.go
+++ b/banyand/liaison/grpc/property.go
@@ -168,7 +168,7 @@ func (ps *propertyServer) Apply(ctx context.Context, req 
*propertyv1.ApplyReques
                nodes = append(nodes, nodeID)
        }
        var prev *propertyv1.Property
-       if prevPropertyWithMetadata != nil && !prevPropertyWithMetadata.deleted 
{
+       if prevPropertyWithMetadata != nil && 
prevPropertyWithMetadata.deletedTime <= 0 {
                prev = prevPropertyWithMetadata.Property
        }
        defer func() {
@@ -195,7 +195,7 @@ func (ps *propertyServer) 
findPrevAndOlderProperties(nodeProperties [][]*propert
        for _, properties := range nodeProperties {
                for _, p := range properties {
                        // if the property is not deleted, then added to the 
older properties list to delete after apply success
-                       if !p.deleted {
+                       if p.deletedTime <= 0 {
                                olderProperties = append(olderProperties, p)
                        }
                        // update the prov property
@@ -328,7 +328,7 @@ func (ps *propertyServer) Delete(ctx context.Context, req 
*propertyv1.DeleteRequ
        for _, properties := range nodeProperties {
                for _, p := range properties {
                        // if the property already delete, then ignore execute 
twice
-                       if p.deleted {
+                       if p.deletedTime > 0 {
                                continue
                        }
                        ids = append(ids, propertypkg.GetPropertyID(p.Property))
@@ -381,8 +381,8 @@ func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryReques
        }
        properties := make([]*propertyv1.Property, 0, len(res))
        for _, p := range res {
-               // ignore deleted property
-               if p.deleted {
+               // ignore deletedTime property
+               if p.deletedTime > 0 {
                        continue
                }
                if len(req.TagProjection) > 0 {
@@ -453,17 +453,17 @@ func (ps *propertyServer) queryProperties(
                        case *propertyv1.InternalQueryResponse:
                                for i, s := range v.Sources {
                                        var p propertyv1.Property
-                                       var deleted bool
+                                       var deleteTime int64
                                        err = protojson.Unmarshal(s, &p)
                                        if err != nil {
                                                return nil, trace, err
                                        }
                                        if i < len(v.Deletes) {
-                                               deleted = v.Deletes[i]
+                                               deleteTime = v.Deletes[i]
                                        }
                                        property := &propertyWithMetadata{
-                                               Property: &p,
-                                               deleted:  deleted,
+                                               Property:    &p,
+                                               deletedTime: deleteTime,
                                        }
                                        nodeWithProperties = 
append(nodeWithProperties, property)
                                }
@@ -502,5 +502,5 @@ func (ps *propertyServer) remove(ids [][]byte) error {
 
 type propertyWithMetadata struct {
        *propertyv1.Property
-       deleted bool
+       deletedTime int64
 }
diff --git a/banyand/property/db.go b/banyand/property/db.go
index 34c552e1..1305648a 100644
--- a/banyand/property/db.go
+++ b/banyand/property/db.go
@@ -55,11 +55,19 @@ type database struct {
        sLst          atomic.Pointer[[]*shard]
        location      string
        flushInterval time.Duration
+       expireDelete  time.Duration
        closed        atomic.Bool
        mu            sync.RWMutex
 }
 
-func openDB(ctx context.Context, location string, flushInterval time.Duration, 
omr observability.MetricsRegistry, lfs fs.FileSystem) (*database, error) {
+func openDB(
+       ctx context.Context,
+       location string,
+       flushInterval time.Duration,
+       expireToDeleteDuration time.Duration,
+       omr observability.MetricsRegistry,
+       lfs fs.FileSystem,
+) (*database, error) {
        loc := filepath.Clean(location)
        lfs.MkdirIfNotExist(loc, storage.DirPerm)
        l := logger.GetLogger("property")
@@ -69,6 +77,7 @@ func openDB(ctx context.Context, location string, 
flushInterval time.Duration, o
                logger:        l,
                omr:           omr,
                flushInterval: flushInterval,
+               expireDelete:  expireToDeleteDuration,
                lfs:           lfs,
        }
        if err := db.load(ctx); err != nil {
@@ -155,7 +164,8 @@ func (db *database) loadShard(ctx context.Context, id 
common.ShardID) (*shard, e
        if s, ok := db.getShard(id); ok {
                return s, nil
        }
-       sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey, 
db.logger), id, int64(db.flushInterval.Seconds()))
+       sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey, 
db.logger), id, int64(db.flushInterval.Seconds()),
+               int64(db.expireDelete.Seconds()))
        if err != nil {
                return nil, err
        }
@@ -226,6 +236,6 @@ func walkDir(root, prefix string, wf walkFn) error {
 }
 
 type queryProperty struct {
-       source  []byte
-       deleted bool
+       source     []byte
+       deleteTime int64
 }
diff --git a/banyand/property/listener.go b/banyand/property/listener.go
index f204816f..1c35f676 100644
--- a/banyand/property/listener.go
+++ b/banyand/property/listener.go
@@ -195,7 +195,7 @@ func (h *queryListener) Rev(ctx context.Context, message 
bus.Message) (resp bus.
        qResp := &propertyv1.InternalQueryResponse{}
        for _, p := range properties {
                qResp.Sources = append(qResp.Sources, p.source)
-               qResp.Deletes = append(qResp.Deletes, p.deleted)
+               qResp.Deletes = append(qResp.Deletes, p.deleteTime)
        }
        if tracer != nil {
                qResp.Trace = tracer.ToProto()
diff --git a/banyand/property/service.go b/banyand/property/service.go
index be3f1fbe..4c2ad948 100644
--- a/banyand/property/service.go
+++ b/banyand/property/service.go
@@ -61,6 +61,7 @@ type service struct {
        nodeID              string
        snapshotDir         string
        flushTimeout        time.Duration
+       expireTimeout       time.Duration
        maxDiskUsagePercent int
        maxFileSnapshotNum  int
 }
@@ -71,6 +72,7 @@ func (s *service) FlagSet() *run.FlagSet {
        flagS.DurationVar(&s.flushTimeout, "property-flush-timeout", 
defaultFlushTimeout, "the memory data timeout of measure")
        flagS.IntVar(&s.maxDiskUsagePercent, "property-max-disk-usage-percent", 
95, "the maximum disk usage percentage allowed")
        flagS.IntVar(&s.maxFileSnapshotNum, "property-max-file-snapshot-num", 
2, "the maximum number of file snapshots allowed")
+       flagS.DurationVar(&s.expireTimeout, "property-expire-delete-timeout", 
time.Hour*24*7, "the duration of the expired data needs to be deleted")
        return flagS
 }
 
@@ -109,7 +111,7 @@ func (s *service) PreRun(ctx context.Context) error {
        s.nodeID = node.NodeID
 
        var err error
-       s.db, err = openDB(ctx, filepath.Join(path, storage.DataDir), 
s.flushTimeout, s.omr, s.lfs)
+       s.db, err = openDB(ctx, filepath.Join(path, storage.DataDir), 
s.flushTimeout, s.expireTimeout, s.omr, s.lfs)
        if err != nil {
                return err
        }
diff --git a/banyand/property/shard.go b/banyand/property/shard.go
index 7a4bef78..b0a36eda 100644
--- a/banyand/property/shard.go
+++ b/banyand/property/shard.go
@@ -23,7 +23,10 @@ import (
        "path"
        "strconv"
        "sync"
+       "time"
 
+       "github.com/RoaringBitmap/roaring"
+       segment "github.com/blugelabs/bluge_segment_api"
        "google.golang.org/protobuf/encoding/protojson"
 
        "github.com/apache/skywalking-banyandb/api/common"
@@ -60,6 +63,8 @@ type shard struct {
        l        *logger.Logger
        location string
        id       common.ShardID
+
+       expireToDeleteSec int64
 }
 
 func (s *shard) close() error {
@@ -69,19 +74,21 @@ func (s *shard) close() error {
        return nil
 }
 
-func (db *database) newShard(ctx context.Context, id common.ShardID, _ int64) 
(*shard, error) {
+func (db *database) newShard(ctx context.Context, id common.ShardID, _ int64, 
deleteExpireSec int64) (*shard, error) {
        location := path.Join(db.location, fmt.Sprintf(shardTemplate, int(id)))
        sName := "shard" + strconv.Itoa(int(id))
        si := &shard{
-               id:       id,
-               l:        logger.Fetch(ctx, sName),
-               location: location,
+               id:                id,
+               l:                 logger.Fetch(ctx, sName),
+               location:          location,
+               expireToDeleteSec: deleteExpireSec,
        }
        opts := inverted.StoreOpts{
-               Path:         location,
-               Logger:       si.l,
-               Metrics:      
inverted.NewMetrics(db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"shard":
 sName}))),
-               BatchWaitSec: 0,
+               Path:                 location,
+               Logger:               si.l,
+               Metrics:              
inverted.NewMetrics(db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"shard":
 sName}))),
+               BatchWaitSec:         0,
+               PrepareMergeCallback: si.prepareForMerge,
        }
        var err error
        if si.store, err = inverted.NewStore(opts); err != nil {
@@ -131,6 +138,10 @@ func (s *shard) buildUpdateDocument(id []byte, property 
*propertyv1.Property) (*
 }
 
 func (s *shard) delete(ctx context.Context, docID [][]byte) error {
+       return s.deleteFromTime(ctx, docID, time.Now().Unix())
+}
+
+func (s *shard) deleteFromTime(ctx context.Context, docID [][]byte, deleteTime 
int64) error {
        // search the original documents by docID
        seriesMatchers := make([]index.SeriesMatcher, 0, len(docID))
        for _, id := range docID {
@@ -153,13 +164,13 @@ func (s *shard) delete(ctx context.Context, docID 
[][]byte) error {
                if err := protojson.Unmarshal(property.source, p); err != nil {
                        return fmt.Errorf("unmarshal property failure: %w", err)
                }
-               // update the property to mark it as deleted
+               // update the property to mark it as delete
                document, err := s.buildUpdateDocument(GetPropertyID(p), p)
                if err != nil {
                        return fmt.Errorf("build delete document failure: %w", 
err)
                }
                // mark the document as deleted
-               document.Deleted = true
+               document.DeletedTime = deleteTime
                removeDocList = append(removeDocList, *document)
        }
        return s.updateDocuments(removeDocList)
@@ -217,11 +228,46 @@ func (s *shard) search(ctx context.Context, indexQuery 
index.Query, limit int,
        data = make([]*queryProperty, 0, len(ss))
        for _, s := range ss {
                bytes := s.Fields[sourceField]
-               deleted := convert.BytesToBool(s.Fields[deleteField])
+               var deleteTime int64
+               if s.Fields[deleteField] != nil {
+                       deleteTime = convert.BytesToInt64(s.Fields[deleteField])
+               }
                data = append(data, &queryProperty{
-                       source:  bytes,
-                       deleted: deleted,
+                       source:     bytes,
+                       deleteTime: deleteTime,
                })
        }
        return data, nil
 }
+
+func (s *shard) prepareForMerge(src []*roaring.Bitmap, segments 
[]segment.Segment, _ uint64) (dest []*roaring.Bitmap, err error) {
+       if len(segments) == 0 || len(src) == 0 || len(segments) != len(src) {
+               return src, nil
+       }
+       for segID, seg := range segments {
+               var docID uint64
+               for ; docID < seg.Count(); docID++ {
+                       var deleteTime int64
+                       err = seg.VisitStoredFields(docID, func(field string, 
value []byte) bool {
+                               if field == deleteField {
+                                       deleteTime = convert.BytesToInt64(value)
+                               }
+                               return true
+                       })
+                       if err != nil {
+                               return src, fmt.Errorf("visit stored field 
failure: %w", err)
+                       }
+
+                       if deleteTime <= 0 || 
int64(time.Since(time.Unix(deleteTime, 0)).Seconds()) < s.expireToDeleteSec {
+                               continue
+                       }
+
+                       if src[segID] == nil {
+                               src[segID] = roaring.New()
+                       }
+
+                       src[segID].Add(uint32(docID))
+               }
+       }
+       return src, nil
+}
diff --git a/banyand/property/shard_test.go b/banyand/property/shard_test.go
new file mode 100644
index 00000000..7f33ebad
--- /dev/null
+++ b/banyand/property/shard_test.go
@@ -0,0 +1,153 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package property
+
+import (
+       "context"
+       "fmt"
+       "testing"
+       "time"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestMergeDeleted(t *testing.T) {
+       propertyCount := 6
+       tests := []struct {
+               verify             func(t *testing.T, queriedProperties 
[]*queryProperty)
+               name               string
+               expireDeletionTime time.Duration
+               deleteTime         int64
+       }{
+               {
+                       name:               "delete expired properties from db",
+                       expireDeletionTime: 1 * time.Second,
+                       deleteTime:         time.Now().Add(-3 * 
time.Second).Unix(),
+                       verify: func(t *testing.T, queriedProperties 
[]*queryProperty) {
+                               // the count of properties in shard should be 
less than the total properties count
+                               if len(queriedProperties) >= propertyCount {
+                                       t.Fatal(fmt.Errorf("expect only %d 
results, got %d", propertyCount, len(queriedProperties)))
+                               }
+                               for _, p := range queriedProperties {
+                                       // and the property should be marked as 
deleteTime
+                                       if p.deleteTime <= 0 {
+                                               t.Fatal(fmt.Errorf("expect all 
results to be deleted"))
+                                       }
+                               }
+                       },
+               },
+               {
+                       name:               "deleted properties still exist in 
db",
+                       expireDeletionTime: time.Hour,
+                       deleteTime:         time.Now().Unix(),
+                       verify: func(t *testing.T, queriedProperties 
[]*queryProperty) {
+                               if len(queriedProperties) != propertyCount {
+                                       t.Fatal(fmt.Errorf("expect %d results, 
got %d", propertyCount, len(queriedProperties)))
+                               }
+                               for _, p := range queriedProperties {
+                                       // and the property should be marked as 
deleteTime
+                                       if p.deleteTime <= 0 {
+                                               t.Fatal(fmt.Errorf("expect all 
results to be deleted"))
+                                       }
+                               }
+                       },
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       var defers []func()
+                       defer func() {
+                               for _, f := range defers {
+                                       f()
+                               }
+                       }()
+
+                       dir, deferFunc, err := test.NewSpace()
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+                       defers = append(defers, deferFunc)
+                       db, err := openDB(context.Background(), dir, 
3*time.Second, tt.expireDeletionTime, observability.BypassRegistry, 
fs.NewLocalFileSystem())
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+                       defers = append(defers, func() {
+                               _ = db.close()
+                       })
+
+                       newShard, err := db.loadShard(context.Background(), 0)
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+
+                       properties := make([]*propertyv1.Property, 0, 
propertyCount)
+                       unix := time.Now().Unix()
+                       unix -= 10
+                       for i := 0; i < propertyCount; i++ {
+                               property := &propertyv1.Property{
+                                       Metadata: &commonv1.Metadata{
+                                               Group:       "test-group",
+                                               Name:        "test-name",
+                                               ModRevision: unix,
+                                       },
+                                       Id: fmt.Sprintf("test-id%d", i),
+                                       Tags: []*modelv1.Tag{
+                                               {Key: "tag1", Value: 
&modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 
int64(i)}}}},
+                                       },
+                               }
+                               properties = append(properties, property)
+                       }
+                       // apply the property
+                       for _, p := range properties {
+                               if err = newShard.update(GetPropertyID(p), p); 
err != nil {
+                                       t.Fatal(err)
+                               }
+                       }
+
+                       resp, err := db.query(context.Background(), 
&propertyv1.QueryRequest{Groups: []string{"test-group"}})
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+                       if len(resp) != propertyCount {
+                               t.Fatal(fmt.Errorf("expect %d results before 
delete, got %d", propertyCount, len(resp)))
+                       }
+
+                       // delete current property
+                       for _, p := range properties {
+                               if err = 
newShard.deleteFromTime(context.Background(), [][]byte{GetPropertyID(p)}, 
tt.deleteTime); err != nil {
+                                       t.Fatal(err)
+                               }
+                       }
+
+                       // waiting for the merge phase to complete
+                       time.Sleep(time.Second * 1)
+
+                       // check if the property is deleteTime from shard 
including deleteTime, should be no document (delete by merge phase)
+                       resp, err = db.query(context.Background(), 
&propertyv1.QueryRequest{Groups: []string{"test-group"}})
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+                       tt.verify(t, resp)
+               })
+       }
+}
diff --git a/dist/LICENSE b/dist/LICENSE
index 60e91a0e..8e318023 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -178,8 +178,8 @@
 Apache-2.0 licenses
 ========================================================================
 
-    github.com/SkyAPM/bluge v0.0.0-20241225104157-e54f64be56e8 Apache-2.0
-    github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118 Apache-2.0
+    github.com/SkyAPM/bluge v0.0.0-20250619030236-3750bbbf63b9 Apache-2.0
+    github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3 Apache-2.0
     github.com/apache/skywalking-cli v0.0.0-20240227151024-ee371a210afe 
Apache-2.0
     github.com/aws/aws-sdk-go-v2 v1.36.3 Apache-2.0
     github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 Apache-2.0
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 72185c0d..3d3016c8 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -3401,7 +3401,7 @@ Property stores the user defined data
 | ----- | ---- | ----- | ----------- |
 | sources | [bytes](#bytes) | repeated |  |
 | trace | [banyandb.common.v1.Trace](#banyandb-common-v1-Trace) |  |  |
-| deletes | [bool](#bool) | repeated | deletes indicates the property is 
deleted or not it&#39;s mapping to the sources in the same order |
+| deletes | [int64](#int64) | repeated | deletes indicates the property is 
deleted timestamps, it&#39;s mapping to the sources in the same order if the 
value is 0, it means the property is not deleted |
 
 
 
diff --git a/go.mod b/go.mod
index bff48b0c..81303bdc 100644
--- a/go.mod
+++ b/go.mod
@@ -197,7 +197,7 @@ require (
 
 replace (
        github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock 
v1.3.1-0.20220809233656-dc7607c94a97
-       github.com/blugelabs/bluge => github.com/SkyAPM/bluge 
v0.0.0-20241225104157-e54f64be56e8
+       github.com/blugelabs/bluge => github.com/SkyAPM/bluge 
v0.0.0-20250619030236-3750bbbf63b9
        github.com/blugelabs/bluge_segment_api => 
github.com/zinclabs/bluge_segment_api v1.0.0
-       github.com/blugelabs/ice => github.com/SkyAPM/ice 
v0.0.0-20241108011032-c3d8eea75118
+       github.com/blugelabs/ice => github.com/SkyAPM/ice 
v0.0.0-20250619023539-b5173603b0b3
 )
diff --git a/go.sum b/go.sum
index 4662c1e1..27a61d91 100644
--- a/go.sum
+++ b/go.sum
@@ -15,12 +15,12 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod 
h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
 github.com/RoaringBitmap/roaring v0.9.4/go.mod 
h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
 github.com/RoaringBitmap/roaring v1.9.4 
h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ=
 github.com/RoaringBitmap/roaring v1.9.4/go.mod 
h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
-github.com/SkyAPM/bluge v0.0.0-20241225104157-e54f64be56e8 
h1:bWJ8yNeaph7oe4S/kvoAM0dNLNamfsOzxaajhLvTAks=
-github.com/SkyAPM/bluge v0.0.0-20241225104157-e54f64be56e8/go.mod 
h1:rriyHHsTidJ4UYFiVDIZyXCFJxQYs5FGpZmIfuvSqPA=
+github.com/SkyAPM/bluge v0.0.0-20250619030236-3750bbbf63b9 
h1:OpK6hoXecSlCCeOENHs6m84Gs0knLKt7AHoXCgmABk4=
+github.com/SkyAPM/bluge v0.0.0-20250619030236-3750bbbf63b9/go.mod 
h1:rriyHHsTidJ4UYFiVDIZyXCFJxQYs5FGpZmIfuvSqPA=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 
h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod 
h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44=
-github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118 
h1:Ja62sgOCp2qPTd8Xmldv1U83v11IRIsh6KlB7UaFLj4=
-github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118/go.mod 
h1:DoQeb0Ee86LyruZSL77Ddscfk/THJ38x453CRCnGEPI=
+github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3 
h1:bZvLPihpC1q1AJkmR6ere/aJOr3ev+JCytvjKuL+gE8=
+github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3/go.mod 
h1:DoQeb0Ee86LyruZSL77Ddscfk/THJ38x453CRCnGEPI=
 github.com/VictoriaMetrics/fastcache v1.12.2 
h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI=
 github.com/VictoriaMetrics/fastcache v1.12.2/go.mod 
h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI=
 github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 
h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 9ec98899..629684c5 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -300,7 +300,7 @@ type Document struct {
        Timestamp    int64
        DocID        uint64
        Version      int64
-       Deleted      bool // for logical deletion
+       DeletedTime  int64 // for logical deletion
 }
 
 // Documents is a collection of documents.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 8450e647..335b1099 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -25,12 +25,14 @@ import (
        "strconv"
        "time"
 
+       roaringpkg "github.com/RoaringBitmap/roaring"
        "github.com/blugelabs/bluge"
        "github.com/blugelabs/bluge/analysis"
        "github.com/blugelabs/bluge/analysis/analyzer"
        blugeIndex "github.com/blugelabs/bluge/index"
        "github.com/blugelabs/bluge/numeric"
        "github.com/blugelabs/bluge/search"
+       segment "github.com/blugelabs/bluge_segment_api"
        "github.com/pkg/errors"
        "go.uber.org/multierr"
 
@@ -76,8 +78,10 @@ var _ index.Store = (*store)(nil)
 
 // StoreOpts wraps options to create an inverted index repository.
 type StoreOpts struct {
-       Logger        *logger.Logger
-       Metrics       *Metrics
+       Logger               *logger.Logger
+       Metrics              *Metrics
+       PrepareMergeCallback func(src []*roaringpkg.Bitmap, segments 
[]segment.Segment, id uint64) (dest []*roaringpkg.Bitmap, err error)
+
        Path          string
        BatchWaitSec  int64
        CacheMaxBytes int
@@ -142,8 +146,8 @@ func (s *store) Batch(batch index.Batch) error {
                if d.Timestamp > 0 {
                        doc.AddField(bluge.NewDateTimeField(timestampField, 
time.Unix(0, d.Timestamp)).StoreValue())
                }
-               if d.Deleted {
-                       doc.AddField(bluge.NewStoredOnlyField(deletedField, 
convert.BoolToBytes(true)).StoreValue())
+               if d.DeletedTime > 0 {
+                       doc.AddField(bluge.NewStoredOnlyField(deletedField, 
convert.Int64ToBytes(d.DeletedTime)).StoreValue())
                }
                b.Insert(doc)
        }
@@ -164,6 +168,7 @@ func NewStore(opts StoreOpts) (index.SeriesStore, error) {
        config := bluge.DefaultConfigWithIndexConfig(indexConfig)
        config.DefaultSearchAnalyzer = Analyzers[index.AnalyzerKeyword]
        config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0)
+       config = config.WithPrepareMergeCallback(opts.PrepareMergeCallback)
        w, err := bluge.OpenWriter(config)
        if err != nil {
                return nil, err
diff --git a/pkg/index/inverted/inverted_series.go 
b/pkg/index/inverted/inverted_series.go
index c99e5aa5..7dd0d998 100644
--- a/pkg/index/inverted/inverted_series.go
+++ b/pkg/index/inverted/inverted_series.go
@@ -127,8 +127,8 @@ func toDoc(d index.Document, toParseFieldNames bool) 
(*bluge.Document, []string)
                vf := bluge.NewStoredOnlyField(versionField, 
convert.Int64ToBytes(d.Version))
                doc.AddField(vf)
        }
-       if d.Deleted {
-               doc.AddField(bluge.NewStoredOnlyField(deletedField, 
convert.BoolToBytes(d.Deleted)).StoreValue())
+       if d.DeletedTime > 0 {
+               doc.AddField(bluge.NewStoredOnlyField(deletedField, 
convert.Int64ToBytes(d.DeletedTime)).StoreValue())
        }
        return doc, fieldNames
 }

Reply via email to