This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new bc60047d Implement versioning properties and replace physical deletion 
with the tombstone mechanism for the property database (#681)
bc60047d is described below

commit bc60047db2f2d8acd9ca80d3a633e93612046b16
Author: mrproliu <741550...@qq.com>
AuthorDate: Tue Jun 17 10:33:13 2025 +0800

    Implement versioning properties and replace physical deletion with the 
tombstone mechanism for the property database (#681)
---
 CHANGES.md                               |   1 +
 api/proto/banyandb/property/v1/rpc.proto |   3 +
 banyand/liaison/grpc/property.go         | 261 ++++++++++++++++--------
 banyand/liaison/grpc/server.go           |   9 +-
 banyand/property/db.go                   |  13 +-
 banyand/property/listener.go             |  12 +-
 banyand/property/property.go             |  19 +-
 banyand/property/shard.go                | 105 ++++++++--
 bydbctl/internal/cmd/property_test.go    | 327 ++++++++++++++++++++++---------
 docs/api-reference.md                    |   1 +
 pkg/convert/number.go                    |  16 ++
 pkg/convert/number_test.go               |  19 ++
 pkg/index/index.go                       |   4 +-
 pkg/index/inverted/inverted.go           |   4 +
 pkg/index/inverted/inverted_series.go    |   9 +
 pkg/test/setup/setup.go                  |  47 +++--
 ui/README.md                             |   2 +-
 17 files changed, 627 insertions(+), 225 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index e819a022..43cc1e2b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -14,6 +14,7 @@ Release Notes.
 - Replica: Support configurable replica count on Group.
 - Replica: Move the TopN pre-calculation flow from the Data Node to the 
Liaison Node.
 - Add a wait and retry to write handlers to avoid the local metadata cache 
being loaded.
+- Implement versioning properties and replace physical deletion with the 
tombstone mechanism for the property database.
 
 ### Bug Fixes
 
diff --git a/api/proto/banyandb/property/v1/rpc.proto 
b/api/proto/banyandb/property/v1/rpc.proto
index 8f70291b..4ea60cef 100644
--- a/api/proto/banyandb/property/v1/rpc.proto
+++ b/api/proto/banyandb/property/v1/rpc.proto
@@ -122,4 +122,7 @@ message InternalDeleteRequest {
 message InternalQueryResponse {
   repeated bytes sources = 1;
   common.v1.Trace trace = 2;
+  // deletes indicates the property is deleted or not
+  // it's mapping to the sources in the same order
+  repeated bool deletes = 3;
 }
diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go
index b26569be..61c4808a 100644
--- a/banyand/liaison/grpc/property.go
+++ b/banyand/liaison/grpc/property.go
@@ -37,8 +37,6 @@ package grpc
 import (
        "context"
        "math"
-       "strconv"
-       "strings"
        "time"
 
        "github.com/pkg/errors"
@@ -54,6 +52,7 @@ import (
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       propertypkg "github.com/apache/skywalking-banyandb/banyand/property"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
@@ -67,6 +66,7 @@ const defaultQueryTimeout = 10 * time.Second
 
 type propertyServer struct {
        propertyv1.UnimplementedPropertyServiceServer
+       *discoveryService
        schemaRegistry metadata.Repo
        pipeline       queue.Client
        nodeRegistry   NodeRegistry
@@ -139,7 +139,7 @@ func (ps *propertyServer) Apply(ctx context.Context, req 
*propertyv1.ApplyReques
                        return nil, errors.Errorf("property %s tag %s not 
found", property.Metadata.Name, tag.Key)
                }
        }
-       qResp, err := ps.Query(ctx, &propertyv1.QueryRequest{
+       nodeProperties, _, err := ps.queryProperties(ctx, 
&propertyv1.QueryRequest{
                Groups: []string{g},
                Name:   property.Metadata.Name,
                Ids:    []string{property.Id},
@@ -147,41 +147,71 @@ func (ps *propertyServer) Apply(ctx context.Context, req 
*propertyv1.ApplyReques
        if err != nil {
                return nil, err
        }
-       var prev *propertyv1.Property
-       if len(qResp.Properties) > 0 {
-               prev = qResp.Properties[0]
-               defer func() {
-                       if err == nil {
-                               var ids [][]byte
-                               for _, p := range qResp.Properties {
-                                       ids = append(ids, getPropertyID(p))
-                               }
-                               if err = ps.remove(ids); err != nil {
-                                       err = multierr.Append(err, 
errors.New("fail to remove old properties"))
-                               }
-                       }
-               }()
-       }
-       entity := getEntity(property)
+       prevPropertyWithMetadata, olderProperties := 
ps.findPrevAndOlderProperties(nodeProperties)
+       entity := propertypkg.GetEntity(property)
        id, err := partition.ShardID(convert.StringToBytes(entity), 
group.ResourceOpts.ShardNum)
        if err != nil {
                return nil, err
        }
-       node, err := ps.nodeRegistry.Locate(g, entity, uint32(id), 0)
-       if err != nil {
-               return nil, err
+       copies, ok := ps.groupRepo.copies(property.Metadata.GetGroup())
+       if !ok {
+               return nil, errors.New("failed to get group copies")
+       }
+
+       nodes := make([]string, 0, copies)
+       var nodeID string
+       for i := range copies {
+               nodeID, err = 
ps.nodeRegistry.Locate(property.GetMetadata().GetGroup(), 
property.GetMetadata().GetName(), uint32(id), i)
+               if err != nil {
+                       return nil, err
+               }
+               nodes = append(nodes, nodeID)
        }
+       var prev *propertyv1.Property
+       if prevPropertyWithMetadata != nil && !prevPropertyWithMetadata.deleted 
{
+               prev = prevPropertyWithMetadata.Property
+       }
+       defer func() {
+               // if their no older properties or have error when apply the 
new property
+               // then ignore cleanup the older properties
+               if len(olderProperties) == 0 || err != nil {
+                       return
+               }
+               var ids [][]byte
+               for _, p := range olderProperties {
+                       ids = append(ids, propertypkg.GetPropertyID(p.Property))
+               }
+               _ = ps.remove(ids)
+       }()
        if req.Strategy == propertyv1.ApplyRequest_STRATEGY_REPLACE {
-               return ps.replaceProperty(ctx, start, uint64(id), node, prev, 
property)
+               return ps.replaceProperty(ctx, start, uint64(id), nodes, prev, 
property)
+       }
+       return ps.mergeProperty(ctx, start, uint64(id), nodes, prev, property)
+}
+
+func (ps *propertyServer) findPrevAndOlderProperties(nodeProperties 
[][]*propertyWithMetadata) (*propertyWithMetadata, []*propertyWithMetadata) {
+       var prevPropertyWithMetadata *propertyWithMetadata
+       var olderProperties []*propertyWithMetadata
+       for _, properties := range nodeProperties {
+               for _, p := range properties {
+                       // if the property is not deleted, then added to the 
older properties list to delete after apply success
+                       if !p.deleted {
+                               olderProperties = append(olderProperties, p)
+                       }
+                       // update the prov property
+                       if prevPropertyWithMetadata == nil || 
p.Metadata.ModRevision > prevPropertyWithMetadata.Metadata.ModRevision {
+                               prevPropertyWithMetadata = p
+                       }
+               }
        }
-       return ps.mergeProperty(ctx, start, uint64(id), node, prev, property)
+       return prevPropertyWithMetadata, olderProperties
 }
 
-func (ps *propertyServer) mergeProperty(ctx context.Context, now time.Time, 
shardID uint64, node string,
+func (ps *propertyServer) mergeProperty(ctx context.Context, now time.Time, 
shardID uint64, nodes []string,
        prev, cur *propertyv1.Property,
 ) (*propertyv1.ApplyResponse, error) {
        if prev == nil {
-               return ps.replaceProperty(ctx, now, shardID, node, prev, cur)
+               return ps.replaceProperty(ctx, now, shardID, nodes, prev, cur)
        }
        tagCount, err := tagLen(prev)
        if err != nil {
@@ -202,7 +232,7 @@ func (ps *propertyServer) mergeProperty(ctx 
context.Context, now time.Time, shar
                }
        }
        cur.Tags = append(cur.Tags, tags...)
-       return ps.replaceProperty(ctx, now, shardID, node, prev, cur)
+       return ps.replaceProperty(ctx, now, shardID, nodes, prev, cur)
 }
 
 func tagLen(property *propertyv1.Property) (uint32, error) {
@@ -214,7 +244,7 @@ func tagLen(property *propertyv1.Property) (uint32, error) {
        return tagsNum, nil
 }
 
-func (ps *propertyServer) replaceProperty(ctx context.Context, now time.Time, 
shardID uint64, node string,
+func (ps *propertyServer) replaceProperty(ctx context.Context, now time.Time, 
shardID uint64, nodes []string,
        prev, cur *propertyv1.Property,
 ) (*propertyv1.ApplyResponse, error) {
        ns := now.UnixNano()
@@ -225,17 +255,38 @@ func (ps *propertyServer) replaceProperty(ctx 
context.Context, now time.Time, sh
        }
        cur.Metadata.ModRevision = ns
        cur.UpdatedAt = timestamppb.New(now)
-       f, err := ps.pipeline.Publish(ctx, data.TopicPropertyUpdate, 
bus.NewMessageWithNode(bus.MessageID(time.Now().Unix()), node, 
&propertyv1.InternalUpdateRequest{
+       req := &propertyv1.InternalUpdateRequest{
                ShardId:  shardID,
-               Id:       getPropertyID(cur),
+               Id:       propertypkg.GetPropertyID(cur),
                Property: cur,
-       }))
-       if err != nil {
-               return nil, err
        }
-       if _, err := f.Get(); err != nil {
-               return nil, err
+       futures := make([]bus.Future, 0, len(nodes))
+       for _, node := range nodes {
+               f, err := ps.pipeline.Publish(ctx, data.TopicPropertyUpdate,
+                       
bus.NewMessageWithNode(bus.MessageID(time.Now().Unix()), node, req))
+               if err != nil {
+                       return nil, errors.Wrapf(err, "failed to publish 
property update to node %s", node)
+               }
+               futures = append(futures, f)
+       }
+       // Wait for all futures to complete, and which should last have one 
success
+       haveSuccess := false
+       var lastestError error
+       for _, f := range futures {
+               _, err := f.Get()
+               if err == nil {
+                       haveSuccess = true
+               } else {
+                       lastestError = multierr.Append(lastestError, err)
+               }
+       }
+       if !haveSuccess {
+               if lastestError != nil {
+                       return nil, lastestError
+               }
+               return nil, errors.New("failed to apply property, no replicas 
success")
        }
+
        return &propertyv1.ApplyResponse{
                Created: prev == nil,
                TagsNum: uint32(len(cur.Tags)),
@@ -266,16 +317,22 @@ func (ps *propertyServer) Delete(ctx context.Context, req 
*propertyv1.DeleteRequ
        if len(req.Id) > 0 {
                qReq.Ids = []string{req.Id}
        }
-       qResp, err := ps.Query(ctx, qReq)
+       nodeProperties, _, err := ps.queryProperties(ctx, qReq)
        if err != nil {
                return nil, err
        }
-       if len(qResp.Properties) == 0 {
+       if len(nodeProperties) == 0 {
                return &propertyv1.DeleteResponse{Deleted: false}, nil
        }
        var ids [][]byte
-       for _, p := range qResp.Properties {
-               ids = append(ids, getPropertyID(p))
+       for _, properties := range nodeProperties {
+               for _, p := range properties {
+                       // if the property already delete, then ignore execute 
twice
+                       if p.deleted {
+                               continue
+                       }
+                       ids = append(ids, propertypkg.GetPropertyID(p.Property))
+               }
        }
        if err := ps.remove(ids); err != nil {
                return nil, err
@@ -299,6 +356,64 @@ func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryReques
        if req.Limit == 0 {
                req.Limit = 100
        }
+
+       nodeProperties, trace, err := ps.queryProperties(ctx, req)
+       if err != nil {
+               return nil, err
+       }
+       res := make(map[string]*propertyWithMetadata)
+       for _, nodeWithProperties := range nodeProperties {
+               for _, propertyMetadata := range nodeWithProperties {
+                       entity := 
propertypkg.GetEntity(propertyMetadata.Property)
+                       cur, ok := res[entity]
+                       if !ok {
+                               res[entity] = propertyMetadata
+                               continue
+                       }
+                       if cur.Metadata.ModRevision < 
propertyMetadata.Metadata.ModRevision {
+                               res[entity] = propertyMetadata
+                               // TODO(mrproliu) handle the case where the 
property detected multiple versions
+                       }
+               }
+       }
+       if len(res) == 0 {
+               return &propertyv1.QueryResponse{Properties: nil, Trace: 
trace}, nil
+       }
+       properties := make([]*propertyv1.Property, 0, len(res))
+       for _, p := range res {
+               // ignore deleted property
+               if p.deleted {
+                       continue
+               }
+               if len(req.TagProjection) > 0 {
+                       var tags []*modelv1.Tag
+                       for _, tag := range p.Tags {
+                               for _, tp := range req.TagProjection {
+                                       if tp == tag.Key {
+                                               tags = append(tags, tag)
+                                               break
+                                       }
+                               }
+                       }
+                       p.Tags = tags
+               }
+               properties = append(properties, p.Property)
+               if len(properties) >= int(req.Limit) {
+                       break
+               }
+       }
+       return &propertyv1.QueryResponse{Properties: properties, Trace: trace}, 
nil
+}
+
+// queryProperties internal properties query, return all properties with 
related nodes.
+func (ps *propertyServer) queryProperties(
+       ctx context.Context,
+       req *propertyv1.QueryRequest,
+) (nodeProperties [][]*propertyWithMetadata, trace *commonv1.Trace, err error) 
{
+       start := time.Now()
+       if req.Limit == 0 {
+               req.Limit = 100
+       }
        var span *query.Span
        if req.Trace {
                tracer, _ := query.NewTracer(ctx, 
start.Format(time.RFC3339Nano))
@@ -308,23 +423,23 @@ func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryReques
                        if err != nil {
                                span.Error(err)
                        } else {
-                               resp.Trace = tracer.ToProto()
+                               trace = tracer.ToProto()
                        }
                        span.Stop()
                }()
        }
        for _, gn := range req.Groups {
                if g, getGroupErr := 
ps.schemaRegistry.GroupRegistry().GetGroup(ctx, gn); getGroupErr != nil {
-                       return nil, errors.Errorf("group %s not found", gn)
+                       return nil, trace, errors.Errorf("group %s not found", 
gn)
                } else if g.Catalog != commonv1.Catalog_CATALOG_PROPERTY {
-                       return nil, errors.Errorf("group %s is not allowed to 
have properties", gn)
+                       return nil, trace, errors.Errorf("group %s is not 
allowed to have properties", gn)
                }
        }
        ff, err := ps.pipeline.Broadcast(defaultQueryTimeout, 
data.TopicPropertyQuery, bus.NewMessage(bus.MessageID(start.Unix()), req))
        if err != nil {
-               return nil, err
+               return nil, trace, err
        }
-       res := make(map[string]*propertyv1.Property)
+       res := make([][]*propertyWithMetadata, 0)
        for _, f := range ff {
                if m, getErr := f.Get(); getErr != nil {
                        err = multierr.Append(err, getErr)
@@ -333,62 +448,41 @@ func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryReques
                        if d == nil {
                                continue
                        }
+                       nodeWithProperties := make([]*propertyWithMetadata, 0)
                        switch v := d.(type) {
                        case *propertyv1.InternalQueryResponse:
-                               for _, s := range v.Sources {
+                               for i, s := range v.Sources {
                                        var p propertyv1.Property
+                                       var deleted bool
                                        err = protojson.Unmarshal(s, &p)
                                        if err != nil {
-                                               return nil, err
+                                               return nil, trace, err
                                        }
-                                       entity := getEntity(&p)
-                                       cur, ok := res[entity]
-                                       if !ok {
-                                               res[entity] = &p
-                                               continue
+                                       if i < len(v.Deletes) {
+                                               deleted = v.Deletes[i]
                                        }
-                                       if cur.Metadata.ModRevision < 
p.Metadata.ModRevision {
-                                               res[entity] = &p
-                                               err = 
ps.remove([][]byte{getPropertyID(cur)})
-                                               if err != nil {
-                                                       return nil, err
-                                               }
+                                       property := &propertyWithMetadata{
+                                               Property: &p,
+                                               deleted:  deleted,
                                        }
+                                       nodeWithProperties = 
append(nodeWithProperties, property)
                                }
                                if span != nil {
                                        span.AddSubTrace(v.Trace)
                                }
+                               res = append(res, nodeWithProperties)
                        case *common.Error:
                                err = multierr.Append(err, 
errors.New(v.Error()))
                        }
                }
        }
        if err != nil {
-               return nil, err
+               return nil, trace, err
        }
        if len(res) == 0 {
-               return &propertyv1.QueryResponse{Properties: nil}, nil
-       }
-       properties := make([]*propertyv1.Property, 0, len(res))
-       for _, p := range res {
-               if len(req.TagProjection) > 0 {
-                       var tags []*modelv1.Tag
-                       for _, tag := range p.Tags {
-                               for _, tp := range req.TagProjection {
-                                       if tp == tag.Key {
-                                               tags = append(tags, tag)
-                                               break
-                                       }
-                               }
-                       }
-                       p.Tags = tags
-               }
-               properties = append(properties, p)
-               if len(properties) >= int(req.Limit) {
-                       break
-               }
+               return res, trace, nil
        }
-       return &propertyv1.QueryResponse{Properties: properties}, nil
+       return res, trace, nil
 }
 
 func (ps *propertyServer) remove(ids [][]byte) error {
@@ -406,10 +500,7 @@ func (ps *propertyServer) remove(ids [][]byte) error {
        return nil
 }
 
-func getPropertyID(prop *propertyv1.Property) []byte {
-       return convert.StringToBytes(getEntity(prop) + "/" + 
strconv.FormatInt(prop.Metadata.ModRevision, 10))
-}
-
-func getEntity(prop *propertyv1.Property) string {
-       return strings.Join([]string{prop.Metadata.Group, prop.Metadata.Name, 
prop.Id}, "/")
+type propertyWithMetadata struct {
+       *propertyv1.Property
+       deleted bool
 }
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 00c384c4..b12d1839 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -147,9 +147,10 @@ func NewServer(_ context.Context, pipeline, broadcaster 
queue.Client, topNPipeli
                        schemaRegistry: schemaRegistry,
                },
                propertyServer: &propertyServer{
-                       schemaRegistry: schemaRegistry,
-                       pipeline:       pipeline,
-                       nodeRegistry:   nr.PropertyNodeRegistry,
+                       schemaRegistry:   schemaRegistry,
+                       pipeline:         pipeline,
+                       nodeRegistry:     nr.PropertyNodeRegistry,
+                       discoveryService: 
newDiscoveryService(schema.KindProperty, schemaRegistry, 
nr.MeasureNodeRegistry),
                },
                propertyRegistryServer: &propertyRegistryServer{
                        schemaRegistry: schemaRegistry,
@@ -164,9 +165,11 @@ func (s *server) PreRun(_ context.Context) error {
        s.log = logger.GetLogger("liaison-grpc")
        s.streamSVC.setLogger(s.log)
        s.measureSVC.setLogger(s.log)
+       s.propertyServer.SetLogger(s.log)
        components := []*discoveryService{
                s.streamSVC.discoveryService,
                s.measureSVC.discoveryService,
+               s.propertyServer.discoveryService,
        }
        for _, c := range components {
                c.SetLogger(s.log)
diff --git a/banyand/property/db.go b/banyand/property/db.go
index 8cc28629..34c552e1 100644
--- a/banyand/property/db.go
+++ b/banyand/property/db.go
@@ -111,19 +111,19 @@ func (db *database) update(ctx context.Context, shardID 
common.ShardID, id []byt
        return nil
 }
 
-func (db *database) delete(docIDs [][]byte) error {
+func (db *database) delete(ctx context.Context, docIDs [][]byte) error {
        sLst := db.sLst.Load()
        if sLst == nil {
                return nil
        }
        var err error
        for _, s := range *sLst {
-               multierr.AppendInto(&err, s.delete(docIDs))
+               multierr.AppendInto(&err, s.delete(ctx, docIDs))
        }
        return err
 }
 
-func (db *database) query(ctx context.Context, req *propertyv1.QueryRequest) 
([][]byte, error) {
+func (db *database) query(ctx context.Context, req *propertyv1.QueryRequest) 
([]*queryProperty, error) {
        iq, err := inverted.BuildPropertyQuery(req, groupField, entityID)
        if err != nil {
                return nil, err
@@ -132,7 +132,7 @@ func (db *database) query(ctx context.Context, req 
*propertyv1.QueryRequest) ([]
        if sLst == nil {
                return nil, nil
        }
-       var res [][]byte
+       var res []*queryProperty
        for _, s := range *sLst {
                r, err := s.search(ctx, iq, int(req.Limit))
                if err != nil {
@@ -224,3 +224,8 @@ func walkDir(root, prefix string, wf walkFn) error {
        }
        return nil
 }
+
+type queryProperty struct {
+       source  []byte
+       deleted bool
+}
diff --git a/banyand/property/listener.go b/banyand/property/listener.go
index f02b7ac2..f204816f 100644
--- a/banyand/property/listener.go
+++ b/banyand/property/listener.go
@@ -110,7 +110,7 @@ type deleteListener struct {
        s *service
 }
 
-func (h *deleteListener) Rev(_ context.Context, message bus.Message) (resp 
bus.Message) {
+func (h *deleteListener) Rev(ctx context.Context, message bus.Message) (resp 
bus.Message) {
        n := time.Now()
        now := n.UnixNano()
        var protoReq proto.Message
@@ -130,7 +130,7 @@ func (h *deleteListener) Rev(_ context.Context, message 
bus.Message) (resp bus.M
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("id 
is empty"))
                return
        }
-       err := h.s.db.delete(d.Ids)
+       err := h.s.db.delete(ctx, d.Ids)
        if err != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to delete property: %v", err))
                return
@@ -180,7 +180,7 @@ func (h *queryListener) Rev(ctx context.Context, message 
bus.Message) (resp bus.
                        span.Stop()
                }()
        }
-       sources, err := h.s.db.query(ctx, d)
+       properties, err := h.s.db.query(ctx, d)
        if err != nil {
                if tracer != nil {
                        span.Error(err)
@@ -192,8 +192,10 @@ func (h *queryListener) Rev(ctx context.Context, message 
bus.Message) (resp bus.
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to query property: %v", err))
                return
        }
-       qResp := &propertyv1.InternalQueryResponse{
-               Sources: sources,
+       qResp := &propertyv1.InternalQueryResponse{}
+       for _, p := range properties {
+               qResp.Sources = append(qResp.Sources, p.source)
+               qResp.Deletes = append(qResp.Deletes, p.deleted)
        }
        if tracer != nil {
                qResp.Trace = tracer.ToProto()
diff --git a/banyand/property/property.go b/banyand/property/property.go
index 59602ff4..98f33304 100644
--- a/banyand/property/property.go
+++ b/banyand/property/property.go
@@ -18,7 +18,14 @@
 // Package property provides the property service interface.
 package property
 
-import "github.com/apache/skywalking-banyandb/pkg/run"
+import (
+       "strconv"
+       "strings"
+
+       propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+)
 
 // Service is the interface for property service.
 type Service interface {
@@ -26,3 +33,13 @@ type Service interface {
        run.Config
        run.Service
 }
+
+// GetPropertyID returns the property ID based on the property metadata and 
revision.
+func GetPropertyID(prop *propertyv1.Property) []byte {
+       return convert.StringToBytes(GetEntity(prop) + "/" + 
strconv.FormatInt(prop.Metadata.ModRevision, 10))
+}
+
+// GetEntity returns the entity string for the property.
+func GetEntity(prop *propertyv1.Property) string {
+       return strings.Join([]string{prop.Metadata.Group, prop.Metadata.Name, 
prop.Id}, "/")
+}
diff --git a/banyand/property/shard.go b/banyand/property/shard.go
index 3fd8d3bd..7a4bef78 100644
--- a/banyand/property/shard.go
+++ b/banyand/property/shard.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "path"
        "strconv"
+       "sync"
 
        "google.golang.org/protobuf/encoding/protojson"
 
@@ -42,14 +43,16 @@ const (
        groupField    = "_group"
        nameField     = index.IndexModeName
        entityID      = "_entity_id"
+       deleteField   = "_deleted"
 )
 
 var (
-       sourceFieldKey = index.FieldKey{TagName: sourceField}
-       entityFieldKey = index.FieldKey{TagName: entityID}
-       groupFieldKey  = index.FieldKey{TagName: groupField}
-       nameFieldKey   = index.FieldKey{TagName: nameField}
-       projection     = []index.FieldKey{sourceFieldKey}
+       sourceFieldKey  = index.FieldKey{TagName: sourceField}
+       entityFieldKey  = index.FieldKey{TagName: entityID}
+       groupFieldKey   = index.FieldKey{TagName: groupField}
+       nameFieldKey    = index.FieldKey{TagName: nameField}
+       deletedFieldKey = index.FieldKey{TagName: deleteField}
+       projection      = []index.FieldKey{sourceFieldKey, deletedFieldKey}
 )
 
 type shard struct {
@@ -66,8 +69,7 @@ func (s *shard) close() error {
        return nil
 }
 
-func (db *database) newShard(ctx context.Context, id common.ShardID, 
flushTimeoutSeconds int64,
-) (*shard, error) {
+func (db *database) newShard(ctx context.Context, id common.ShardID, _ int64) 
(*shard, error) {
        location := path.Join(db.location, fmt.Sprintf(shardTemplate, int(id)))
        sName := "shard" + strconv.Itoa(int(id))
        si := &shard{
@@ -79,7 +81,7 @@ func (db *database) newShard(ctx context.Context, id 
common.ShardID, flushTimeou
                Path:         location,
                Logger:       si.l,
                Metrics:      
inverted.NewMetrics(db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"shard":
 sName}))),
-               BatchWaitSec: flushTimeoutSeconds,
+               BatchWaitSec: 0,
        }
        var err error
        if si.store, err = inverted.NewStore(opts); err != nil {
@@ -89,9 +91,17 @@ func (db *database) newShard(ctx context.Context, id 
common.ShardID, flushTimeou
 }
 
 func (s *shard) update(id []byte, property *propertyv1.Property) error {
+       document, err := s.buildUpdateDocument(id, property)
+       if err != nil {
+               return fmt.Errorf("build update document failure: %w", err)
+       }
+       return s.updateDocuments(index.Documents{*document})
+}
+
+func (s *shard) buildUpdateDocument(id []byte, property *propertyv1.Property) 
(*index.Document, error) {
        pj, err := protojson.Marshal(property)
        if err != nil {
-               return err
+               return nil, err
        }
        sourceField := index.NewBytesField(sourceFieldKey, pj)
        sourceField.NoSort = true
@@ -110,24 +120,78 @@ func (s *shard) update(id []byte, property 
*propertyv1.Property) error {
        for _, t := range property.Tags {
                tv, err := pbv1.MarshalTagValue(t.Value)
                if err != nil {
-                       return err
+                       return nil, err
                }
                tagField := index.NewBytesField(index.FieldKey{IndexRuleID: 
uint32(convert.HashStr(t.Key))}, tv)
                tagField.Index = true
                tagField.NoSort = true
                doc.Fields = append(doc.Fields, tagField)
        }
-       return s.store.UpdateSeriesBatch(index.Batch{
-               Documents: index.Documents{doc},
-       })
+       return &doc, nil
 }
 
-func (s *shard) delete(docID [][]byte) error {
-       return s.store.Delete(docID)
+func (s *shard) delete(ctx context.Context, docID [][]byte) error {
+       // search the original documents by docID
+       seriesMatchers := make([]index.SeriesMatcher, 0, len(docID))
+       for _, id := range docID {
+               seriesMatchers = append(seriesMatchers, index.SeriesMatcher{
+                       Match: id,
+                       Type:  index.SeriesMatcherTypeExact,
+               })
+       }
+       iq, err := s.store.BuildQuery(seriesMatchers, nil, nil)
+       if err != nil {
+               return fmt.Errorf("build property query failure: %w", err)
+       }
+       exisingDocList, err := s.search(ctx, iq, len(docID))
+       if err != nil {
+               return fmt.Errorf("search existing documents failure: %w", err)
+       }
+       removeDocList := make(index.Documents, 0, len(exisingDocList))
+       for _, property := range exisingDocList {
+               p := &propertyv1.Property{}
+               if err := protojson.Unmarshal(property.source, p); err != nil {
+                       return fmt.Errorf("unmarshal property failure: %w", err)
+               }
+               // update the property to mark it as deleted
+               document, err := s.buildUpdateDocument(GetPropertyID(p), p)
+               if err != nil {
+                       return fmt.Errorf("build delete document failure: %w", 
err)
+               }
+               // mark the document as deleted
+               document.Deleted = true
+               removeDocList = append(removeDocList, *document)
+       }
+       return s.updateDocuments(removeDocList)
+}
+
+func (s *shard) updateDocuments(docs index.Documents) error {
+       if len(docs) == 0 {
+               return nil
+       }
+       var updateErr, persistentError error
+       wg := sync.WaitGroup{}
+       wg.Add(1)
+
+       updateErr = s.store.UpdateSeriesBatch(index.Batch{
+               Documents: docs,
+               PersistentCallback: func(err error) {
+                       persistentError = err
+                       wg.Done()
+               },
+       })
+       if updateErr != nil {
+               return updateErr
+       }
+       wg.Wait()
+       if persistentError != nil {
+               return fmt.Errorf("persistent failure: %w", persistentError)
+       }
+       return nil
 }
 
 func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int,
-) (data [][]byte, err error) {
+) (data []*queryProperty, err error) {
        tracer := query.GetTracer(ctx)
        if tracer != nil {
                span, _ := tracer.StartSpan(ctx, "property.search")
@@ -150,9 +214,14 @@ func (s *shard) search(ctx context.Context, indexQuery 
index.Query, limit int,
        if len(ss) == 0 {
                return nil, nil
        }
-       data = make([][]byte, 0, len(ss))
+       data = make([]*queryProperty, 0, len(ss))
        for _, s := range ss {
-               data = append(data, s.Fields[sourceField])
+               bytes := s.Fields[sourceField]
+               deleted := convert.BytesToBool(s.Fields[deleteField])
+               data = append(data, &queryProperty{
+                       source:  bytes,
+                       deleted: deleted,
+               })
        }
        return data, nil
 }
diff --git a/bydbctl/internal/cmd/property_test.go 
b/bydbctl/internal/cmd/property_test.go
index 9ef1d04b..b4813cf3 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -18,6 +18,8 @@
 package cmd_test
 
 import (
+       "errors"
+       "fmt"
        "strings"
 
        . "github.com/onsi/ginkgo/v2"
@@ -27,46 +29,55 @@ import (
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
        "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
 )
 
-var _ = Describe("Property Operation", func() {
-       var addr string
-       var deferFunc func()
-       var rootCmd *cobra.Command
-       p1YAML := `
+var (
+       propertyGroup    = "ui-template"
+       propertyTagCount = 2
+       p1YAML           = fmt.Sprintf(`
 metadata:
-  group: ui-template 
+  group: %s
   name: service
 id: kubernetes
 tags:
   - key: content
     value:
       str:
-        value: foo 
+        value: foo111
   - key: state
     value:
       int:
         value: 1
-`
-       p2YAML := `
+`, propertyGroup)
+
+       p2YAML = fmt.Sprintf(`
 metadata:
-  group: ui-template 
+  group: %s
   name: service
 id: kubernetes
 tags:
   - key: content
     value:
       str:
-        value: foo 
+        value: foo333
   - key: state
     value:
       int:
         value: 3
-`
+`, propertyGroup)
+)
+
+var _ = Describe("Property Operation", func() {
+       var addr string
+       var deferFunc func()
+       var rootCmd *cobra.Command
 
        p1Proto := new(propertyv1.Property)
        helpers.UnmarshalYAML([]byte(p1YAML), p1Proto)
@@ -78,72 +89,35 @@ tags:
                // extracting the operation of creating property schema
                rootCmd = &cobra.Command{Use: "root"}
                cmd.RootCmdFlags(rootCmd)
-               rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", 
"-"})
-               createGroup := func() string {
-                       rootCmd.SetIn(strings.NewReader(`
-metadata:
-  name: ui-template
-catalog: CATALOG_PROPERTY
-resource_opts:
-  shard_num: 2
-`))
-                       return capturer.CaptureStdout(func() {
-                               err := rootCmd.Execute()
-                               if err != nil {
-                                       GinkgoWriter.Printf("execution 
fails:%v", err)
-                               }
-                       })
-               }
-               Eventually(createGroup, 
flags.EventuallyTimeout).Should(ContainSubstring("group ui-template is 
created"))
-               rootCmd.SetArgs([]string{"property", "schema", "create", "-a", 
addr, "-f", "-"})
-               createPropertySchema := func() string {
-                       rootCmd.SetIn(strings.NewReader(`
-metadata:
-  name: service
-  group: ui-template
-tags:
-  - name: content
-    type: TAG_TYPE_STRING
-  - name: state
-    type: TAG_TYPE_INT
-`))
-                       return capturer.CaptureStdout(func() {
-                               err := rootCmd.Execute()
-                               if err != nil {
-                                       GinkgoWriter.Printf("execution 
fails:%v", err)
-                               }
-                       })
-               }
-               Eventually(createPropertySchema, 
flags.EventuallyTimeout).Should(ContainSubstring("property schema 
ui-template.service is created"))
-               rootCmd.SetArgs([]string{"property", "data", "apply", "-a", 
addr, "-f", "-"})
-               rootCmd.SetIn(strings.NewReader(p1YAML))
+               defUITemplateWithSchema(rootCmd, addr, 2)
+               applyData(rootCmd, addr, p1YAML, true, propertyTagCount)
+       })
+
+       It("update property", func() {
+               // update the property
+               applyData(rootCmd, addr, p2YAML, false, propertyTagCount)
+
+               // check the property
+               queryData(rootCmd, addr, propertyGroup, 1, func(data string, 
resp *propertyv1.QueryResponse) {
+                       Expect(data).To(ContainSubstring("foo333"))
+               })
+       })
+
+       It("apply same property after delete", func() {
+               // delete
+               rootCmd.SetArgs([]string{"property", "data", "delete", "-g", 
"ui-template", "-n", "service", "-i", "kubernetes"})
                out := capturer.CaptureStdout(func() {
                        err := rootCmd.Execute()
                        Expect(err).NotTo(HaveOccurred())
                })
-               GinkgoWriter.Println(out)
-               Expect(out).To(ContainSubstring("created: true"))
-               Expect(out).To(ContainSubstring("tagsNum: 2"))
+               Expect(out).To(ContainSubstring("deleted: true"))
+
+               // apply property(created should be true)
+               applyData(rootCmd, addr, p2YAML, true, propertyTagCount)
        })
 
        It("query all properties", func() {
-               rootCmd.SetArgs([]string{"property", "data", "query", "-a", 
addr, "-f", "-"})
-               issue := func() string {
-                       rootCmd.SetIn(strings.NewReader(`
-groups: ["ui-template"]
-`))
-                       return capturer.CaptureStdout(func() {
-                               err := rootCmd.Execute()
-                               Expect(err).NotTo(HaveOccurred())
-                       })
-               }
-               Eventually(func() int {
-                       out := issue()
-                       resp := new(propertyv1.QueryResponse)
-                       helpers.UnmarshalYAML([]byte(out), resp)
-                       GinkgoWriter.Println(resp)
-                       return len(resp.Properties)
-               }, flags.EventuallyTimeout).Should(Equal(1))
+               queryData(rootCmd, addr, propertyGroup, 1, nil)
        })
 
        It("delete property", func() {
@@ -154,23 +128,8 @@ groups: ["ui-template"]
                        Expect(err).NotTo(HaveOccurred())
                })
                Expect(out).To(ContainSubstring("deleted: true"))
-               rootCmd.SetArgs([]string{"property", "data", "query", "-a", 
addr, "-f", "-"})
-               issue := func() string {
-                       rootCmd.SetIn(strings.NewReader(`
-groups: ["ui-template"]
-`))
-                       return capturer.CaptureStdout(func() {
-                               err := rootCmd.Execute()
-                               Expect(err).NotTo(HaveOccurred())
-                       })
-               }
-               Eventually(func() int {
-                       out := issue()
-                       resp := new(propertyv1.QueryResponse)
-                       helpers.UnmarshalYAML([]byte(out), resp)
-                       GinkgoWriter.Println(resp)
-                       return len(resp.Properties)
-               }, flags.EventuallyTimeout).Should(Equal(0))
+
+               queryData(rootCmd, addr, propertyGroup, 0, nil)
        })
 
        AfterEach(func() {
@@ -368,8 +327,8 @@ metadata:
   name: endpoint
   group: ui-template
 tags:
-  - name: content
-    type: TAG_TYPE_STRING
+- name: content
+  type: TAG_TYPE_STRING
 `))
                out = capturer.CaptureStdout(func() {
                        err := rootCmd.Execute()
@@ -395,3 +354,191 @@ tags:
                deferFunc()
        })
 })
+
+var _ = Describe("Property Cluster Operation", func() {
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+
+       var addr string
+       var deferFunc func()
+       var rootCmd *cobra.Command
+       BeforeEach(func() {
+               rootCmd = &cobra.Command{Use: "root"}
+               cmd.RootCmdFlags(rootCmd)
+               var ports []int
+
+               // first creating
+               By("Starting node1 with data")
+               node1Dir, spaceDef1, err := test.NewSpace()
+               Expect(err).NotTo(HaveOccurred())
+               ports, err = test.AllocateFreePorts(4)
+               Expect(err).NotTo(HaveOccurred())
+               _, node1Addr, node1Defer := setup.ClosableStandalone(node1Dir, 
ports)
+               node1Addr = httpSchema + node1Addr
+               defUITemplateWithSchema(rootCmd, node1Addr, 1)
+               applyData(rootCmd, node1Addr, p1YAML, true, propertyTagCount)
+               queryData(rootCmd, node1Addr, propertyGroup, 1, func(data 
string, resp *propertyv1.QueryResponse) {
+                       Expect(data).To(ContainSubstring("foo111"))
+               })
+               node1Defer()
+
+               By("Starting node2 with data")
+               node2Dir, spaceDef2, err := test.NewSpace()
+               Expect(err).NotTo(HaveOccurred())
+               ports, err = test.AllocateFreePorts(4)
+               Expect(err).NotTo(HaveOccurred())
+               _, node2Addr, node2Defer := setup.ClosableStandalone(node2Dir, 
ports)
+               node2Addr = httpSchema + node2Addr
+               defUITemplateWithSchema(rootCmd, node2Addr, 1)
+               applyData(rootCmd, node2Addr, p2YAML, true, propertyTagCount)
+               queryData(rootCmd, node2Addr, propertyGroup, 1, func(data 
string, resp *propertyv1.QueryResponse) {
+                       Expect(data).To(ContainSubstring("foo333"))
+               })
+               node2Defer()
+
+               // setup cluster with two data nodes
+               By("Starting etcd server")
+               ports, err = test.AllocateFreePorts(2)
+               Expect(err).NotTo(HaveOccurred())
+               dir, spaceDef, err := test.NewSpace()
+               Expect(err).NotTo(HaveOccurred())
+               ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+               server, err := embeddedetcd.NewServer(
+                       embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+                       embeddedetcd.RootDir(dir),
+               )
+               Expect(err).ShouldNot(HaveOccurred())
+               <-server.ReadyNotify()
+               By("Starting data node 0")
+               closeDataNode0 := setup.DataNodeFromDataDir(ep, node1Dir)
+               By("Starting data node 1")
+               closeDataNode1 := setup.DataNodeFromDataDir(ep, node2Dir)
+               By("Starting liaison node")
+               _, liaisonHTTPAddr, closerLiaisonNode := 
setup.LiaisonNodeWithHTTP(ep)
+               By("Initializing test cases")
+
+               deferFunc = func() {
+                       closerLiaisonNode()
+                       closeDataNode0()
+                       closeDataNode1()
+                       _ = server.Close()
+                       <-server.StopNotify()
+                       spaceDef()
+                       spaceDef1()
+                       spaceDef2()
+               }
+               addr = httpSchema + liaisonHTTPAddr
+               // creating schema
+               defUITemplateWithSchema(rootCmd, addr, 1)
+       })
+
+       AfterEach(func() {
+               deferFunc()
+       })
+
+       It("query from difference version", func() {
+               queryData(rootCmd, addr, propertyGroup, 1, func(data string, 
resp *propertyv1.QueryResponse) {
+                       Expect(data).Should(ContainSubstring("foo333"))
+               })
+       })
+
+       It("delete property", func() {
+               // delete properties
+               rootCmd.SetArgs([]string{"property", "data", "delete", "-g", 
"ui-template", "-n", "service", "-i", "kubernetes"})
+               out := capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+               Expect(out).To(ContainSubstring("deleted: true"))
+
+               // should no properties after deletion
+               queryData(rootCmd, addr, propertyGroup, 0, nil)
+
+               // created again, the created should be true
+               applyData(rootCmd, addr, p1YAML, true, propertyTagCount)
+       })
+})
+
+func defUITemplateWithSchema(rootCmd *cobra.Command, addr string, shardCount 
int) {
+       rootCmd.SetArgs([]string{"group", "create", "-a", addr, "-f", "-"})
+       createGroup := func() string {
+               rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
+metadata:
+  name: ui-template
+catalog: CATALOG_PROPERTY
+resource_opts:
+  shard_num: %d
+`, shardCount)))
+               return capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       if err != nil {
+                               GinkgoWriter.Printf("execution fails:%v", err)
+                       }
+               })
+       }
+       Eventually(createGroup, 
flags.EventuallyTimeout).Should(ContainSubstring("group ui-template is 
created"))
+
+       rootCmd.SetArgs([]string{"property", "schema", "create", "-a", addr, 
"-f", "-"})
+       createPropertySchema := func() string {
+               rootCmd.SetIn(strings.NewReader(`
+metadata:
+  name: service
+  group: ui-template
+tags:
+  - name: content
+    type: TAG_TYPE_STRING
+  - name: state
+    type: TAG_TYPE_INT
+`))
+               return capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       if err != nil {
+                               GinkgoWriter.Printf("execution fails:%v", err)
+                       }
+               })
+       }
+       Eventually(createPropertySchema, 
flags.EventuallyTimeout).Should(ContainSubstring("property schema 
ui-template.service is created"))
+}
+
+func applyData(rootCmd *cobra.Command, addr, data string, created bool, 
tagsNum int) {
+       rootCmd.SetArgs([]string{"property", "data", "apply", "-a", addr, "-f", 
"-"})
+       rootCmd.SetIn(strings.NewReader(data))
+       out := capturer.CaptureStdout(func() {
+               err := rootCmd.Execute()
+               Expect(err).NotTo(HaveOccurred())
+       })
+       GinkgoWriter.Println(out)
+       Expect(out).To(ContainSubstring(fmt.Sprintf("created: %t", created)))
+       Expect(out).To(ContainSubstring(fmt.Sprintf("tagsNum: %d", tagsNum)))
+}
+
+func queryData(rootCmd *cobra.Command, addr, group string, dataCount int, 
verify func(data string, resp *propertyv1.QueryResponse)) {
+       rootCmd.SetArgs([]string{"property", "data", "query", "-a", addr, "-f", 
"-"})
+       issue := func() string {
+               rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`groups: ["%s"]`, 
group)))
+               return capturer.CaptureStdout(func() {
+                       err := rootCmd.Execute()
+                       Expect(err).NotTo(HaveOccurred())
+               })
+       }
+       Eventually(func() error {
+               out := issue()
+               resp := new(propertyv1.QueryResponse)
+               helpers.UnmarshalYAML([]byte(out), resp)
+               GinkgoWriter.Println(resp)
+
+               failures := InterceptGomegaFailures(func() {
+                       Expect(len(resp.Properties)).To(Equal(dataCount))
+                       if verify != nil {
+                               verify(out, resp)
+                       }
+               })
+
+               if len(failures) > 0 {
+                       return errors.New(strings.Join(failures, "\n"))
+               }
+               return nil
+       }, flags.EventuallyTimeout).Should(Succeed())
+}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 40f72f7b..72185c0d 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -3401,6 +3401,7 @@ Property stores the user defined data
 | ----- | ---- | ----- | ----------- |
 | sources | [bytes](#bytes) | repeated |  |
 | trace | [banyandb.common.v1.Trace](#banyandb-common-v1-Trace) |  |  |
+| deletes | [bool](#bool) | repeated | deletes indicates the property is 
deleted or not it&#39;s mapping to the sources in the same order |
 
 
 
diff --git a/pkg/convert/number.go b/pkg/convert/number.go
index 0074c588..4667b088 100644
--- a/pkg/convert/number.go
+++ b/pkg/convert/number.go
@@ -51,6 +51,14 @@ func Uint32ToBytes(u uint32) []byte {
        return bs
 }
 
+// BoolToBytes converts bool to bytes.
+func BoolToBytes(b bool) []byte {
+       if b {
+               return []byte{1}
+       }
+       return []byte{0}
+}
+
 // BytesToInt64 converts bytes to int64.
 func BytesToInt64(b []byte) int64 {
        u := binary.BigEndian.Uint64(b)
@@ -87,3 +95,11 @@ func Float64ToBytes(f float64) []byte {
 func BytesToFloat64(b []byte) float64 {
        return math.Float64frombits(binary.BigEndian.Uint64(b))
 }
+
+// BytesToBool converts bytes to bool.
+func BytesToBool(b []byte) bool {
+       if len(b) == 0 {
+               return false
+       }
+       return b[0] != 0
+}
diff --git a/pkg/convert/number_test.go b/pkg/convert/number_test.go
index 9fcd8473..1a5b310c 100644
--- a/pkg/convert/number_test.go
+++ b/pkg/convert/number_test.go
@@ -46,3 +46,22 @@ func TestInt64ToBytes(t *testing.T) {
                })
        }
 }
+
+func TestBoolToBytes(t *testing.T) {
+       testCases := []struct {
+               expected []byte
+               input    bool
+       }{
+               {[]byte{1}, true},
+               {[]byte{0}, false},
+       }
+
+       for _, tc := range testCases {
+               t.Run(fmt.Sprintf("BoolToBytes(%t)", tc.input), func(t 
*testing.T) {
+                       result := BoolToBytes(tc.input)
+                       if !bytes.Equal(result, tc.expected) {
+                               t.Errorf("Expected %v, got %v", tc.expected, 
result)
+                       }
+               })
+       }
+}
diff --git a/pkg/index/index.go b/pkg/index/index.go
index e13334b2..9ec98899 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -300,6 +300,7 @@ type Document struct {
        Timestamp    int64
        DocID        uint64
        Version      int64
+       Deleted      bool // for logical deletion
 }
 
 // Documents is a collection of documents.
@@ -307,7 +308,8 @@ type Documents []Document
 
 // Batch is a collection of documents.
 type Batch struct {
-       Documents Documents
+       PersistentCallback func(error)
+       Documents          Documents
 }
 
 // Writer allows writing fields and docID in a document to an index.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 9ac005af..8450e647 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -52,6 +52,7 @@ const (
        timestampField = "_timestamp"
        versionField   = "_version"
        sourceField    = "_source"
+       deletedField   = "_deleted"
 )
 
 var (
@@ -141,6 +142,9 @@ func (s *store) Batch(batch index.Batch) error {
                if d.Timestamp > 0 {
                        doc.AddField(bluge.NewDateTimeField(timestampField, 
time.Unix(0, d.Timestamp)).StoreValue())
                }
+               if d.Deleted {
+                       doc.AddField(bluge.NewStoredOnlyField(deletedField, 
convert.BoolToBytes(true)).StoreValue())
+               }
                b.Insert(doc)
        }
        return s.writer.Batch(b)
diff --git a/pkg/index/inverted/inverted_series.go 
b/pkg/index/inverted/inverted_series.go
index 93c95c88..c99e5aa5 100644
--- a/pkg/index/inverted/inverted_series.go
+++ b/pkg/index/inverted/inverted_series.go
@@ -47,6 +47,9 @@ func (s *store) InsertSeriesBatch(batch index.Batch) error {
        }
        defer s.closer.Done()
        b := generateBatch()
+       if batch.PersistentCallback != nil {
+               b.SetPersistedCallback(batch.PersistentCallback)
+       }
        defer releaseBatch(b)
        for _, d := range batch.Documents {
                doc, ff := toDoc(d, true)
@@ -65,6 +68,9 @@ func (s *store) UpdateSeriesBatch(batch index.Batch) error {
        defer s.closer.Done()
        b := generateBatch()
        defer releaseBatch(b)
+       if batch.PersistentCallback != nil {
+               b.SetPersistedCallback(batch.PersistentCallback)
+       }
        for _, d := range batch.Documents {
                doc, _ := toDoc(d, false)
                b.Update(doc.ID(), doc)
@@ -121,6 +127,9 @@ func toDoc(d index.Document, toParseFieldNames bool) 
(*bluge.Document, []string)
                vf := bluge.NewStoredOnlyField(versionField, 
convert.Int64ToBytes(d.Version))
                doc.AddField(vf)
        }
+       if d.Deleted {
+               doc.AddField(bluge.NewStoredOnlyField(deletedField, 
convert.BoolToBytes(d.Deleted)).StoreValue())
+       }
        return doc, fieldNames
 }
 
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 53f87cad..6cd6b7ba 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -204,10 +204,7 @@ func CMD(flags ...string) func() {
        }
 }
 
-func startDataNode(etcdEndpoint string, flags ...string) (string, string, 
func(), func()) {
-       path, deferFn, err := test.NewSpace()
-       gomega.Expect(err).NotTo(gomega.HaveOccurred())
-
+func startDataNode(etcdEndpoint, dataDir string, flags ...string) (string, 
func()) {
        ports, err := test.AllocateFreePorts(1)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
 
@@ -218,9 +215,9 @@ func startDataNode(etcdEndpoint string, flags ...string) 
(string, string, func()
                "data",
                "--grpc-host="+host,
                fmt.Sprintf("--grpc-port=%d", ports[0]),
-               "--stream-root-path="+path,
-               "--measure-root-path="+path,
-               "--property-root-path="+path,
+               "--stream-root-path="+dataDir,
+               "--measure-root-path="+dataDir,
+               "--property-root-path="+dataDir,
                "--etcd-endpoints", etcdEndpoint,
                "--node-host-provider", "flag",
                "--node-host", nodeHost,
@@ -236,48 +233,64 @@ func startDataNode(etcdEndpoint string, flags ...string) 
(string, string, func()
                return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0]))
        }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1))
 
-       return addr, path, closeFn, deferFn
+       return addr, closeFn
 }
 
 // DataNode runs a data node.
 func DataNode(etcdEndpoint string, flags ...string) func() {
-       _, _, closeFn, deferFn := startDataNode(etcdEndpoint, flags...)
+       path, deferFn, err := test.NewSpace()
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       closeFn := DataNodeFromDataDir(etcdEndpoint, path, flags...)
        return func() {
                closeFn()
                deferFn()
        }
 }
 
+// DataNodeFromDataDir runs a data node with a specific data directory.
+func DataNodeFromDataDir(etcdEndpoint, dataDir string, flags ...string) func() 
{
+       _, closeFn := startDataNode(etcdEndpoint, dataDir, flags...)
+       return closeFn
+}
+
 // DataNodeWithAddrAndDir runs a data node and returns the address and root 
path.
 func DataNodeWithAddrAndDir(etcdEndpoint string, flags ...string) (string, 
string, func()) {
-       addr, dir, closeFn, deferFn := startDataNode(etcdEndpoint, flags...)
-       return addr, dir, func() {
+       path, deferFn, err := test.NewSpace()
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       addr, closeFn := startDataNode(etcdEndpoint, path, flags...)
+       return addr, path, func() {
                closeFn()
                deferFn()
        }
 }
 
 // LiaisonNode runs a liaison node.
-func LiaisonNode(etcdEndpoint string, flags ...string) (string, func()) {
+func LiaisonNode(etcdEndpoint string, flags ...string) (grpcAddr string, 
closeFn func()) {
+       grpcAddr, _, closeFn = LiaisonNodeWithHTTP(etcdEndpoint, flags...)
+       return
+}
+
+// LiaisonNodeWithHTTP runs a liaison node with HTTP enabled and returns the 
gRPC and HTTP addresses.
+func LiaisonNodeWithHTTP(etcdEndpoint string, flags ...string) (grpcAddr, 
httpAddr string, closeFn func()) {
        ports, err := test.AllocateFreePorts(2)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       addr := fmt.Sprintf("%s:%d", host, ports[0])
-       httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
+       grpcAddr = fmt.Sprintf("%s:%d", host, ports[0])
+       httpAddr = fmt.Sprintf("%s:%d", host, ports[1])
        nodeHost := "127.0.0.1"
        flags = append(flags, "liaison",
                "--grpc-host="+host,
                fmt.Sprintf("--grpc-port=%d", ports[0]),
                "--http-host="+host,
                fmt.Sprintf("--http-port=%d", ports[1]),
-               "--http-grpc-addr="+addr,
+               "--http-grpc-addr="+grpcAddr,
                "--etcd-endpoints", etcdEndpoint,
                "--node-host-provider", "flag",
                "--node-host", nodeHost,
        )
-       closeFn := CMD(flags...)
+       closeFn = CMD(flags...)
        gomega.Eventually(helpers.HTTPHealthCheck(httpAddr, ""), 
testflags.EventuallyTimeout).Should(gomega.Succeed())
        gomega.Eventually(func() (map[string]*databasev1.Node, error) {
                return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0]))
        }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1))
-       return addr, closeFn
+       return
 }
diff --git a/ui/README.md b/ui/README.md
index 612ab135..435f9a9c 100644
--- a/ui/README.md
+++ b/ui/README.md
@@ -4,7 +4,7 @@ This template should help get you started developing with Vue 3 
in Vite.
 
 ## Recommended IDE Setup
 
-[VSCode](https://code.visualstudio.com/) + 
[Volar](https://marketplace.visualstudio.com/items?itemName=Vue.volar) (and 
disable Vetur) + [TypeScript Vue Plugin 
(Volar)](https://marketplace.visualstudio.com/items?itemName=Vue.vscode-typescript-vue-plugin).
+VSCode + Volar (and disable Vetur) + TypeScript Vue Plugin (Volar).
 
 ## Setting up Environment Variables
 


Reply via email to