This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch delete-when-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 6f709fc11bb0c951335995b36195e9efdaf52236
Author: mrproliu <741550...@qq.com>
AuthorDate: Fri Jun 20 20:26:01 2025 +0800

    Support delete properties when query
---
 banyand/liaison/grpc/property.go      | 10 ++++-
 bydbctl/internal/cmd/property_test.go | 77 ++++++++++++++++++++++++++++++++---
 2 files changed, 80 insertions(+), 7 deletions(-)

diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go
index d65c3728..c9e15203 100644
--- a/banyand/liaison/grpc/property.go
+++ b/banyand/liaison/grpc/property.go
@@ -362,6 +362,7 @@ func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryReques
                return nil, err
        }
        res := make(map[string]*propertyWithMetadata)
+       shouldDeleteOlderProperties := make([][]byte, 0)
        for _, nodeWithProperties := range nodeProperties {
                for _, propertyMetadata := range nodeWithProperties {
                        entity := 
propertypkg.GetEntity(propertyMetadata.Property)
@@ -372,10 +373,17 @@ func (ps *propertyServer) Query(ctx context.Context, req 
*propertyv1.QueryReques
                        }
                        if cur.Metadata.ModRevision < 
propertyMetadata.Metadata.ModRevision {
                                res[entity] = propertyMetadata
-                               // TODO(mrproliu) handle the case where the 
property detected multiple versions
+                               shouldDeleteOlderProperties = 
append(shouldDeleteOlderProperties, propertypkg.GetPropertyID(cur.Property))
+                       } else {
+                               shouldDeleteOlderProperties = 
append(shouldDeleteOlderProperties, 
propertypkg.GetPropertyID(propertyMetadata.Property))
                        }
                }
        }
+       if len(shouldDeleteOlderProperties) > 0 {
+               if err := ps.remove(shouldDeleteOlderProperties); err != nil {
+                       ps.log.Warn().Msgf("failed to delete old properties 
when query: %v", err)
+               }
+       }
        if len(res) == 0 {
                return &propertyv1.QueryResponse{Properties: nil, Trace: 
trace}, nil
        }
diff --git a/bydbctl/internal/cmd/property_test.go 
b/bydbctl/internal/cmd/property_test.go
index b4813cf3..37c8ecef 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -18,8 +18,11 @@
 package cmd_test
 
 import (
+       "context"
        "errors"
        "fmt"
+       "os"
+       "path"
        "strings"
 
        . "github.com/onsi/ginkgo/v2"
@@ -31,6 +34,9 @@ import (
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
        "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
@@ -72,6 +78,8 @@ tags:
       int:
         value: 3
 `, propertyGroup)
+
+       deletedFieldKey = index.FieldKey{TagName: "_deleted"}
 )
 
 var _ = Describe("Property Operation", func() {
@@ -364,14 +372,18 @@ var _ = Describe("Property Cluster Operation", func() {
        var addr string
        var deferFunc func()
        var rootCmd *cobra.Command
+       var node1Dir, node2Dir string
+       var closeNode1, closeNode2 func()
        BeforeEach(func() {
                rootCmd = &cobra.Command{Use: "root"}
                cmd.RootCmdFlags(rootCmd)
                var ports []int
+               var err error
+               var spaceDef1, spaceDef2 func()
 
                // first creating
                By("Starting node1 with data")
-               node1Dir, spaceDef1, err := test.NewSpace()
+               node1Dir, spaceDef1, err = test.NewSpace()
                Expect(err).NotTo(HaveOccurred())
                ports, err = test.AllocateFreePorts(4)
                Expect(err).NotTo(HaveOccurred())
@@ -385,7 +397,7 @@ var _ = Describe("Property Cluster Operation", func() {
                node1Defer()
 
                By("Starting node2 with data")
-               node2Dir, spaceDef2, err := test.NewSpace()
+               node2Dir, spaceDef2, err = test.NewSpace()
                Expect(err).NotTo(HaveOccurred())
                ports, err = test.AllocateFreePorts(4)
                Expect(err).NotTo(HaveOccurred())
@@ -412,17 +424,17 @@ var _ = Describe("Property Cluster Operation", func() {
                Expect(err).ShouldNot(HaveOccurred())
                <-server.ReadyNotify()
                By("Starting data node 0")
-               closeDataNode0 := setup.DataNodeFromDataDir(ep, node1Dir)
+               closeNode1 = setup.DataNodeFromDataDir(ep, node1Dir)
                By("Starting data node 1")
-               closeDataNode1 := setup.DataNodeFromDataDir(ep, node2Dir)
+               closeNode2 = setup.DataNodeFromDataDir(ep, node2Dir)
                By("Starting liaison node")
                _, liaisonHTTPAddr, closerLiaisonNode := 
setup.LiaisonNodeWithHTTP(ep)
                By("Initializing test cases")
 
                deferFunc = func() {
                        closerLiaisonNode()
-                       closeDataNode0()
-                       closeDataNode1()
+                       closeNode1()
+                       closeNode2()
                        _ = server.Close()
                        <-server.StopNotify()
                        spaceDef()
@@ -442,6 +454,34 @@ var _ = Describe("Property Cluster Operation", func() {
                queryData(rootCmd, addr, propertyGroup, 1, func(data string, 
resp *propertyv1.QueryResponse) {
                        Expect(data).Should(ContainSubstring("foo333"))
                })
+               closeNode1()
+               closeNode2()
+
+               // check there should have two real properties in the dest 
database
+               // and one of them should be deleted (marked in the query phase)
+               store1, err := generateInvertedStore(node1Dir)
+               Expect(err).NotTo(HaveOccurred())
+               store2, err := generateInvertedStore(node2Dir)
+               Expect(err).NotTo(HaveOccurred())
+
+               query, err := 
inverted.BuildPropertyQuery(&propertyv1.QueryRequest{
+                       Groups: []string{propertyGroup},
+               }, "_group", "_entity_id")
+               Expect(err).NotTo(HaveOccurred())
+               node1Search, err := store1.Search(context.Background(), 
[]index.FieldKey{deletedFieldKey}, query, 10)
+               Expect(err).NotTo(HaveOccurred())
+               node2Search, err := store2.Search(context.Background(), 
[]index.FieldKey{deletedFieldKey}, query, 10)
+               Expect(err).NotTo(HaveOccurred())
+
+               totalProperties := append(node1Search, node2Search...)
+               Expect(len(totalProperties)).Should(Equal(2))
+               deletedCount := 0
+               for _, p := range totalProperties {
+                       if 
convert.BytesToBool(p.Fields[deletedFieldKey.TagName]) {
+                               deletedCount++
+                       }
+               }
+               Expect(deletedCount).Should(Equal(1))
        })
 
        It("delete property", func() {
@@ -542,3 +582,28 @@ func queryData(rootCmd *cobra.Command, addr, group string, 
dataCount int, verify
                return nil
        }, flags.EventuallyTimeout).Should(Succeed())
 }
+
+func generateInvertedStore(rootPath string) (index.SeriesStore, error) {
+       shardParent := path.Join(rootPath, "property", "data")
+       list, err := os.ReadDir(shardParent)
+       if err != nil {
+               return nil, fmt.Errorf("read dir %s error: %w", shardParent, 
err)
+       }
+       if len(list) == 0 {
+               return nil, fmt.Errorf("no shard found in %s", shardParent)
+       }
+       for _, e := range list {
+               if !e.Type().IsDir() {
+                       continue
+               }
+               _, found := strings.CutPrefix(e.Name(), "shard-")
+               if !found {
+                       continue
+               }
+               return inverted.NewStore(
+                       inverted.StoreOpts{
+                               Path: path.Join(shardParent, e.Name()),
+                       })
+       }
+       return nil, fmt.Errorf("no shard found in %s", rootPath)
+}

Reply via email to