This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new c3667f77 Physics delete property when merging segment (#684) c3667f77 is described below commit c3667f774c4f92e21ee9eac8b78321a3808c57a9 Author: mrproliu <741550...@qq.com> AuthorDate: Fri Jun 20 13:20:09 2025 +0800 Physics delete property when merging segment (#684) --- api/proto/banyandb/property/v1/rpc.proto | 5 +- banyand/liaison/grpc/discovery.go | 6 +- banyand/liaison/grpc/property.go | 20 ++-- banyand/property/db.go | 18 +++- banyand/property/listener.go | 2 +- banyand/property/service.go | 4 +- banyand/property/shard.go | 72 ++++++++++++--- banyand/property/shard_test.go | 153 +++++++++++++++++++++++++++++++ dist/LICENSE | 4 +- docs/api-reference.md | 2 +- go.mod | 4 +- go.sum | 8 +- pkg/index/index.go | 2 +- pkg/index/inverted/inverted.go | 13 ++- pkg/index/inverted/inverted_series.go | 4 +- 15 files changed, 267 insertions(+), 50 deletions(-) diff --git a/api/proto/banyandb/property/v1/rpc.proto b/api/proto/banyandb/property/v1/rpc.proto index 4ea60cef..ba826b2c 100644 --- a/api/proto/banyandb/property/v1/rpc.proto +++ b/api/proto/banyandb/property/v1/rpc.proto @@ -122,7 +122,8 @@ message InternalDeleteRequest { message InternalQueryResponse { repeated bytes sources = 1; common.v1.Trace trace = 2; - // deletes indicates the property is deleted or not + // deletes indicates the property is deleted timestamps, // it's mapping to the sources in the same order - repeated bool deletes = 3; + // if the value is 0, it means the property is not deleted + repeated int64 deletes = 3; } diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go index eed1807b..f9542a70 100644 --- a/banyand/liaison/grpc/discovery.go +++ b/banyand/liaison/grpc/discovery.go @@ -146,7 +146,7 @@ func (s *groupRepo) OnDelete(schemaMetadata schema.Metadata) { return } if le := s.log.Debug(); le.Enabled() { - le.Stringer("id", group.Metadata).Msg("shard deleted") + le.Stringer("id", group.Metadata).Msg("shard deletedTime") } s.RWMutex.Lock() defer s.RWMutex.Unlock() @@ -263,7 +263,7 @@ func (e *entityRepo) OnDelete(schemaMetadata schema.Metadata) { Str("action", "delete"). Stringer("subject", id). Str("kind", kind). - Msg("entity deleted") + Msg("entity deletedTime") } e.RWMutex.Lock() defer e.RWMutex.Unlock() @@ -339,7 +339,7 @@ func (s *shardingKeyRepo) OnDelete(schemaMetadata schema.Metadata) { Str("action", "delete"). Stringer("subject", id). Str("kind", "measure"). - Msg("sharding key deleted") + Msg("sharding key deletedTime") } s.RWMutex.Lock() defer s.RWMutex.Unlock() diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go index 61c4808a..d65c3728 100644 --- a/banyand/liaison/grpc/property.go +++ b/banyand/liaison/grpc/property.go @@ -168,7 +168,7 @@ func (ps *propertyServer) Apply(ctx context.Context, req *propertyv1.ApplyReques nodes = append(nodes, nodeID) } var prev *propertyv1.Property - if prevPropertyWithMetadata != nil && !prevPropertyWithMetadata.deleted { + if prevPropertyWithMetadata != nil && prevPropertyWithMetadata.deletedTime <= 0 { prev = prevPropertyWithMetadata.Property } defer func() { @@ -195,7 +195,7 @@ func (ps *propertyServer) findPrevAndOlderProperties(nodeProperties [][]*propert for _, properties := range nodeProperties { for _, p := range properties { // if the property is not deleted, then added to the older properties list to delete after apply success - if !p.deleted { + if p.deletedTime <= 0 { olderProperties = append(olderProperties, p) } // update the prov property @@ -328,7 +328,7 @@ func (ps *propertyServer) Delete(ctx context.Context, req *propertyv1.DeleteRequ for _, properties := range nodeProperties { for _, p := range properties { // if the property already delete, then ignore execute twice - if p.deleted { + if p.deletedTime > 0 { continue } ids = append(ids, propertypkg.GetPropertyID(p.Property)) @@ -381,8 +381,8 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques } properties := make([]*propertyv1.Property, 0, len(res)) for _, p := range res { - // ignore deleted property - if p.deleted { + // ignore deletedTime property + if p.deletedTime > 0 { continue } if len(req.TagProjection) > 0 { @@ -453,17 +453,17 @@ func (ps *propertyServer) queryProperties( case *propertyv1.InternalQueryResponse: for i, s := range v.Sources { var p propertyv1.Property - var deleted bool + var deleteTime int64 err = protojson.Unmarshal(s, &p) if err != nil { return nil, trace, err } if i < len(v.Deletes) { - deleted = v.Deletes[i] + deleteTime = v.Deletes[i] } property := &propertyWithMetadata{ - Property: &p, - deleted: deleted, + Property: &p, + deletedTime: deleteTime, } nodeWithProperties = append(nodeWithProperties, property) } @@ -502,5 +502,5 @@ func (ps *propertyServer) remove(ids [][]byte) error { type propertyWithMetadata struct { *propertyv1.Property - deleted bool + deletedTime int64 } diff --git a/banyand/property/db.go b/banyand/property/db.go index 34c552e1..1305648a 100644 --- a/banyand/property/db.go +++ b/banyand/property/db.go @@ -55,11 +55,19 @@ type database struct { sLst atomic.Pointer[[]*shard] location string flushInterval time.Duration + expireDelete time.Duration closed atomic.Bool mu sync.RWMutex } -func openDB(ctx context.Context, location string, flushInterval time.Duration, omr observability.MetricsRegistry, lfs fs.FileSystem) (*database, error) { +func openDB( + ctx context.Context, + location string, + flushInterval time.Duration, + expireToDeleteDuration time.Duration, + omr observability.MetricsRegistry, + lfs fs.FileSystem, +) (*database, error) { loc := filepath.Clean(location) lfs.MkdirIfNotExist(loc, storage.DirPerm) l := logger.GetLogger("property") @@ -69,6 +77,7 @@ func openDB(ctx context.Context, location string, flushInterval time.Duration, o logger: l, omr: omr, flushInterval: flushInterval, + expireDelete: expireToDeleteDuration, lfs: lfs, } if err := db.load(ctx); err != nil { @@ -155,7 +164,8 @@ func (db *database) loadShard(ctx context.Context, id common.ShardID) (*shard, e if s, ok := db.getShard(id); ok { return s, nil } - sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey, db.logger), id, int64(db.flushInterval.Seconds())) + sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey, db.logger), id, int64(db.flushInterval.Seconds()), + int64(db.expireDelete.Seconds())) if err != nil { return nil, err } @@ -226,6 +236,6 @@ func walkDir(root, prefix string, wf walkFn) error { } type queryProperty struct { - source []byte - deleted bool + source []byte + deleteTime int64 } diff --git a/banyand/property/listener.go b/banyand/property/listener.go index f204816f..1c35f676 100644 --- a/banyand/property/listener.go +++ b/banyand/property/listener.go @@ -195,7 +195,7 @@ func (h *queryListener) Rev(ctx context.Context, message bus.Message) (resp bus. qResp := &propertyv1.InternalQueryResponse{} for _, p := range properties { qResp.Sources = append(qResp.Sources, p.source) - qResp.Deletes = append(qResp.Deletes, p.deleted) + qResp.Deletes = append(qResp.Deletes, p.deleteTime) } if tracer != nil { qResp.Trace = tracer.ToProto() diff --git a/banyand/property/service.go b/banyand/property/service.go index be3f1fbe..4c2ad948 100644 --- a/banyand/property/service.go +++ b/banyand/property/service.go @@ -61,6 +61,7 @@ type service struct { nodeID string snapshotDir string flushTimeout time.Duration + expireTimeout time.Duration maxDiskUsagePercent int maxFileSnapshotNum int } @@ -71,6 +72,7 @@ func (s *service) FlagSet() *run.FlagSet { flagS.DurationVar(&s.flushTimeout, "property-flush-timeout", defaultFlushTimeout, "the memory data timeout of measure") flagS.IntVar(&s.maxDiskUsagePercent, "property-max-disk-usage-percent", 95, "the maximum disk usage percentage allowed") flagS.IntVar(&s.maxFileSnapshotNum, "property-max-file-snapshot-num", 2, "the maximum number of file snapshots allowed") + flagS.DurationVar(&s.expireTimeout, "property-expire-delete-timeout", time.Hour*24*7, "the duration of the expired data needs to be deleted") return flagS } @@ -109,7 +111,7 @@ func (s *service) PreRun(ctx context.Context) error { s.nodeID = node.NodeID var err error - s.db, err = openDB(ctx, filepath.Join(path, storage.DataDir), s.flushTimeout, s.omr, s.lfs) + s.db, err = openDB(ctx, filepath.Join(path, storage.DataDir), s.flushTimeout, s.expireTimeout, s.omr, s.lfs) if err != nil { return err } diff --git a/banyand/property/shard.go b/banyand/property/shard.go index 7a4bef78..b0a36eda 100644 --- a/banyand/property/shard.go +++ b/banyand/property/shard.go @@ -23,7 +23,10 @@ import ( "path" "strconv" "sync" + "time" + "github.com/RoaringBitmap/roaring" + segment "github.com/blugelabs/bluge_segment_api" "google.golang.org/protobuf/encoding/protojson" "github.com/apache/skywalking-banyandb/api/common" @@ -60,6 +63,8 @@ type shard struct { l *logger.Logger location string id common.ShardID + + expireToDeleteSec int64 } func (s *shard) close() error { @@ -69,19 +74,21 @@ func (s *shard) close() error { return nil } -func (db *database) newShard(ctx context.Context, id common.ShardID, _ int64) (*shard, error) { +func (db *database) newShard(ctx context.Context, id common.ShardID, _ int64, deleteExpireSec int64) (*shard, error) { location := path.Join(db.location, fmt.Sprintf(shardTemplate, int(id))) sName := "shard" + strconv.Itoa(int(id)) si := &shard{ - id: id, - l: logger.Fetch(ctx, sName), - location: location, + id: id, + l: logger.Fetch(ctx, sName), + location: location, + expireToDeleteSec: deleteExpireSec, } opts := inverted.StoreOpts{ - Path: location, - Logger: si.l, - Metrics: inverted.NewMetrics(db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"shard": sName}))), - BatchWaitSec: 0, + Path: location, + Logger: si.l, + Metrics: inverted.NewMetrics(db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"shard": sName}))), + BatchWaitSec: 0, + PrepareMergeCallback: si.prepareForMerge, } var err error if si.store, err = inverted.NewStore(opts); err != nil { @@ -131,6 +138,10 @@ func (s *shard) buildUpdateDocument(id []byte, property *propertyv1.Property) (* } func (s *shard) delete(ctx context.Context, docID [][]byte) error { + return s.deleteFromTime(ctx, docID, time.Now().Unix()) +} + +func (s *shard) deleteFromTime(ctx context.Context, docID [][]byte, deleteTime int64) error { // search the original documents by docID seriesMatchers := make([]index.SeriesMatcher, 0, len(docID)) for _, id := range docID { @@ -153,13 +164,13 @@ func (s *shard) delete(ctx context.Context, docID [][]byte) error { if err := protojson.Unmarshal(property.source, p); err != nil { return fmt.Errorf("unmarshal property failure: %w", err) } - // update the property to mark it as deleted + // update the property to mark it as delete document, err := s.buildUpdateDocument(GetPropertyID(p), p) if err != nil { return fmt.Errorf("build delete document failure: %w", err) } // mark the document as deleted - document.Deleted = true + document.DeletedTime = deleteTime removeDocList = append(removeDocList, *document) } return s.updateDocuments(removeDocList) @@ -217,11 +228,46 @@ func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, data = make([]*queryProperty, 0, len(ss)) for _, s := range ss { bytes := s.Fields[sourceField] - deleted := convert.BytesToBool(s.Fields[deleteField]) + var deleteTime int64 + if s.Fields[deleteField] != nil { + deleteTime = convert.BytesToInt64(s.Fields[deleteField]) + } data = append(data, &queryProperty{ - source: bytes, - deleted: deleted, + source: bytes, + deleteTime: deleteTime, }) } return data, nil } + +func (s *shard) prepareForMerge(src []*roaring.Bitmap, segments []segment.Segment, _ uint64) (dest []*roaring.Bitmap, err error) { + if len(segments) == 0 || len(src) == 0 || len(segments) != len(src) { + return src, nil + } + for segID, seg := range segments { + var docID uint64 + for ; docID < seg.Count(); docID++ { + var deleteTime int64 + err = seg.VisitStoredFields(docID, func(field string, value []byte) bool { + if field == deleteField { + deleteTime = convert.BytesToInt64(value) + } + return true + }) + if err != nil { + return src, fmt.Errorf("visit stored field failure: %w", err) + } + + if deleteTime <= 0 || int64(time.Since(time.Unix(deleteTime, 0)).Seconds()) < s.expireToDeleteSec { + continue + } + + if src[segID] == nil { + src[segID] = roaring.New() + } + + src[segID].Add(uint32(docID)) + } + } + return src, nil +} diff --git a/banyand/property/shard_test.go b/banyand/property/shard_test.go new file mode 100644 index 00000000..7f33ebad --- /dev/null +++ b/banyand/property/shard_test.go @@ -0,0 +1,153 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "testing" + "time" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/test" +) + +func TestMergeDeleted(t *testing.T) { + propertyCount := 6 + tests := []struct { + verify func(t *testing.T, queriedProperties []*queryProperty) + name string + expireDeletionTime time.Duration + deleteTime int64 + }{ + { + name: "delete expired properties from db", + expireDeletionTime: 1 * time.Second, + deleteTime: time.Now().Add(-3 * time.Second).Unix(), + verify: func(t *testing.T, queriedProperties []*queryProperty) { + // the count of properties in shard should be less than the total properties count + if len(queriedProperties) >= propertyCount { + t.Fatal(fmt.Errorf("expect only %d results, got %d", propertyCount, len(queriedProperties))) + } + for _, p := range queriedProperties { + // and the property should be marked as deleteTime + if p.deleteTime <= 0 { + t.Fatal(fmt.Errorf("expect all results to be deleted")) + } + } + }, + }, + { + name: "deleted properties still exist in db", + expireDeletionTime: time.Hour, + deleteTime: time.Now().Unix(), + verify: func(t *testing.T, queriedProperties []*queryProperty) { + if len(queriedProperties) != propertyCount { + t.Fatal(fmt.Errorf("expect %d results, got %d", propertyCount, len(queriedProperties))) + } + for _, p := range queriedProperties { + // and the property should be marked as deleteTime + if p.deleteTime <= 0 { + t.Fatal(fmt.Errorf("expect all results to be deleted")) + } + } + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var defers []func() + defer func() { + for _, f := range defers { + f() + } + }() + + dir, deferFunc, err := test.NewSpace() + if err != nil { + t.Fatal(err) + } + defers = append(defers, deferFunc) + db, err := openDB(context.Background(), dir, 3*time.Second, tt.expireDeletionTime, observability.BypassRegistry, fs.NewLocalFileSystem()) + if err != nil { + t.Fatal(err) + } + defers = append(defers, func() { + _ = db.close() + }) + + newShard, err := db.loadShard(context.Background(), 0) + if err != nil { + t.Fatal(err) + } + + properties := make([]*propertyv1.Property, 0, propertyCount) + unix := time.Now().Unix() + unix -= 10 + for i := 0; i < propertyCount; i++ { + property := &propertyv1.Property{ + Metadata: &commonv1.Metadata{ + Group: "test-group", + Name: "test-name", + ModRevision: unix, + }, + Id: fmt.Sprintf("test-id%d", i), + Tags: []*modelv1.Tag{ + {Key: "tag1", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: int64(i)}}}}, + }, + } + properties = append(properties, property) + } + // apply the property + for _, p := range properties { + if err = newShard.update(GetPropertyID(p), p); err != nil { + t.Fatal(err) + } + } + + resp, err := db.query(context.Background(), &propertyv1.QueryRequest{Groups: []string{"test-group"}}) + if err != nil { + t.Fatal(err) + } + if len(resp) != propertyCount { + t.Fatal(fmt.Errorf("expect %d results before delete, got %d", propertyCount, len(resp))) + } + + // delete current property + for _, p := range properties { + if err = newShard.deleteFromTime(context.Background(), [][]byte{GetPropertyID(p)}, tt.deleteTime); err != nil { + t.Fatal(err) + } + } + + // waiting for the merge phase to complete + time.Sleep(time.Second * 1) + + // check if the property is deleteTime from shard including deleteTime, should be no document (delete by merge phase) + resp, err = db.query(context.Background(), &propertyv1.QueryRequest{Groups: []string{"test-group"}}) + if err != nil { + t.Fatal(err) + } + tt.verify(t, resp) + }) + } +} diff --git a/dist/LICENSE b/dist/LICENSE index 60e91a0e..8e318023 100644 --- a/dist/LICENSE +++ b/dist/LICENSE @@ -178,8 +178,8 @@ Apache-2.0 licenses ======================================================================== - github.com/SkyAPM/bluge v0.0.0-20241225104157-e54f64be56e8 Apache-2.0 - github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118 Apache-2.0 + github.com/SkyAPM/bluge v0.0.0-20250619030236-3750bbbf63b9 Apache-2.0 + github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3 Apache-2.0 github.com/apache/skywalking-cli v0.0.0-20240227151024-ee371a210afe Apache-2.0 github.com/aws/aws-sdk-go-v2 v1.36.3 Apache-2.0 github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 Apache-2.0 diff --git a/docs/api-reference.md b/docs/api-reference.md index 72185c0d..3d3016c8 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -3401,7 +3401,7 @@ Property stores the user defined data | ----- | ---- | ----- | ----------- | | sources | [bytes](#bytes) | repeated | | | trace | [banyandb.common.v1.Trace](#banyandb-common-v1-Trace) | | | -| deletes | [bool](#bool) | repeated | deletes indicates the property is deleted or not it's mapping to the sources in the same order | +| deletes | [int64](#int64) | repeated | deletes indicates the property is deleted timestamps, it's mapping to the sources in the same order if the value is 0, it means the property is not deleted | diff --git a/go.mod b/go.mod index bff48b0c..81303bdc 100644 --- a/go.mod +++ b/go.mod @@ -197,7 +197,7 @@ require ( replace ( github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 - github.com/blugelabs/bluge => github.com/SkyAPM/bluge v0.0.0-20241225104157-e54f64be56e8 + github.com/blugelabs/bluge => github.com/SkyAPM/bluge v0.0.0-20250619030236-3750bbbf63b9 github.com/blugelabs/bluge_segment_api => github.com/zinclabs/bluge_segment_api v1.0.0 - github.com/blugelabs/ice => github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118 + github.com/blugelabs/ice => github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3 ) diff --git a/go.sum b/go.sum index 4662c1e1..27a61d91 100644 --- a/go.sum +++ b/go.sum @@ -15,12 +15,12 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ= github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= -github.com/SkyAPM/bluge v0.0.0-20241225104157-e54f64be56e8 h1:bWJ8yNeaph7oe4S/kvoAM0dNLNamfsOzxaajhLvTAks= -github.com/SkyAPM/bluge v0.0.0-20241225104157-e54f64be56e8/go.mod h1:rriyHHsTidJ4UYFiVDIZyXCFJxQYs5FGpZmIfuvSqPA= +github.com/SkyAPM/bluge v0.0.0-20250619030236-3750bbbf63b9 h1:OpK6hoXecSlCCeOENHs6m84Gs0knLKt7AHoXCgmABk4= +github.com/SkyAPM/bluge v0.0.0-20250619030236-3750bbbf63b9/go.mod h1:rriyHHsTidJ4UYFiVDIZyXCFJxQYs5FGpZmIfuvSqPA= github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s= github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44= -github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118 h1:Ja62sgOCp2qPTd8Xmldv1U83v11IRIsh6KlB7UaFLj4= -github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118/go.mod h1:DoQeb0Ee86LyruZSL77Ddscfk/THJ38x453CRCnGEPI= +github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3 h1:bZvLPihpC1q1AJkmR6ere/aJOr3ev+JCytvjKuL+gE8= +github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3/go.mod h1:DoQeb0Ee86LyruZSL77Ddscfk/THJ38x453CRCnGEPI= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= diff --git a/pkg/index/index.go b/pkg/index/index.go index 9ec98899..629684c5 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -300,7 +300,7 @@ type Document struct { Timestamp int64 DocID uint64 Version int64 - Deleted bool // for logical deletion + DeletedTime int64 // for logical deletion } // Documents is a collection of documents. diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 8450e647..335b1099 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -25,12 +25,14 @@ import ( "strconv" "time" + roaringpkg "github.com/RoaringBitmap/roaring" "github.com/blugelabs/bluge" "github.com/blugelabs/bluge/analysis" "github.com/blugelabs/bluge/analysis/analyzer" blugeIndex "github.com/blugelabs/bluge/index" "github.com/blugelabs/bluge/numeric" "github.com/blugelabs/bluge/search" + segment "github.com/blugelabs/bluge_segment_api" "github.com/pkg/errors" "go.uber.org/multierr" @@ -76,8 +78,10 @@ var _ index.Store = (*store)(nil) // StoreOpts wraps options to create an inverted index repository. type StoreOpts struct { - Logger *logger.Logger - Metrics *Metrics + Logger *logger.Logger + Metrics *Metrics + PrepareMergeCallback func(src []*roaringpkg.Bitmap, segments []segment.Segment, id uint64) (dest []*roaringpkg.Bitmap, err error) + Path string BatchWaitSec int64 CacheMaxBytes int @@ -142,8 +146,8 @@ func (s *store) Batch(batch index.Batch) error { if d.Timestamp > 0 { doc.AddField(bluge.NewDateTimeField(timestampField, time.Unix(0, d.Timestamp)).StoreValue()) } - if d.Deleted { - doc.AddField(bluge.NewStoredOnlyField(deletedField, convert.BoolToBytes(true)).StoreValue()) + if d.DeletedTime > 0 { + doc.AddField(bluge.NewStoredOnlyField(deletedField, convert.Int64ToBytes(d.DeletedTime)).StoreValue()) } b.Insert(doc) } @@ -164,6 +168,7 @@ func NewStore(opts StoreOpts) (index.SeriesStore, error) { config := bluge.DefaultConfigWithIndexConfig(indexConfig) config.DefaultSearchAnalyzer = Analyzers[index.AnalyzerKeyword] config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0) + config = config.WithPrepareMergeCallback(opts.PrepareMergeCallback) w, err := bluge.OpenWriter(config) if err != nil { return nil, err diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index c99e5aa5..7dd0d998 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -127,8 +127,8 @@ func toDoc(d index.Document, toParseFieldNames bool) (*bluge.Document, []string) vf := bluge.NewStoredOnlyField(versionField, convert.Int64ToBytes(d.Version)) doc.AddField(vf) } - if d.Deleted { - doc.AddField(bluge.NewStoredOnlyField(deletedField, convert.BoolToBytes(d.Deleted)).StoreValue()) + if d.DeletedTime > 0 { + doc.AddField(bluge.NewStoredOnlyField(deletedField, convert.Int64ToBytes(d.DeletedTime)).StoreValue()) } return doc, fieldNames }