This is an automated email from the ASF dual-hosted git repository. liuhan pushed a commit to branch delete-when-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 6f709fc11bb0c951335995b36195e9efdaf52236 Author: mrproliu <741550...@qq.com> AuthorDate: Fri Jun 20 20:26:01 2025 +0800 Support delete properties when query --- banyand/liaison/grpc/property.go | 10 ++++- bydbctl/internal/cmd/property_test.go | 77 ++++++++++++++++++++++++++++++++--- 2 files changed, 80 insertions(+), 7 deletions(-) diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go index d65c3728..c9e15203 100644 --- a/banyand/liaison/grpc/property.go +++ b/banyand/liaison/grpc/property.go @@ -362,6 +362,7 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques return nil, err } res := make(map[string]*propertyWithMetadata) + shouldDeleteOlderProperties := make([][]byte, 0) for _, nodeWithProperties := range nodeProperties { for _, propertyMetadata := range nodeWithProperties { entity := propertypkg.GetEntity(propertyMetadata.Property) @@ -372,10 +373,17 @@ func (ps *propertyServer) Query(ctx context.Context, req *propertyv1.QueryReques } if cur.Metadata.ModRevision < propertyMetadata.Metadata.ModRevision { res[entity] = propertyMetadata - // TODO(mrproliu) handle the case where the property detected multiple versions + shouldDeleteOlderProperties = append(shouldDeleteOlderProperties, propertypkg.GetPropertyID(cur.Property)) + } else { + shouldDeleteOlderProperties = append(shouldDeleteOlderProperties, propertypkg.GetPropertyID(propertyMetadata.Property)) } } } + if len(shouldDeleteOlderProperties) > 0 { + if err := ps.remove(shouldDeleteOlderProperties); err != nil { + ps.log.Warn().Msgf("failed to delete old properties when query: %v", err) + } + } if len(res) == 0 { return &propertyv1.QueryResponse{Properties: nil, Trace: trace}, nil } diff --git a/bydbctl/internal/cmd/property_test.go b/bydbctl/internal/cmd/property_test.go index b4813cf3..37c8ecef 100644 --- a/bydbctl/internal/cmd/property_test.go +++ b/bydbctl/internal/cmd/property_test.go @@ -18,8 +18,11 @@ package cmd_test import ( + "context" "errors" "fmt" + "os" + "path" "strings" . "github.com/onsi/ginkgo/v2" @@ -31,6 +34,9 @@ import ( propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" @@ -72,6 +78,8 @@ tags: int: value: 3 `, propertyGroup) + + deletedFieldKey = index.FieldKey{TagName: "_deleted"} ) var _ = Describe("Property Operation", func() { @@ -364,14 +372,18 @@ var _ = Describe("Property Cluster Operation", func() { var addr string var deferFunc func() var rootCmd *cobra.Command + var node1Dir, node2Dir string + var closeNode1, closeNode2 func() BeforeEach(func() { rootCmd = &cobra.Command{Use: "root"} cmd.RootCmdFlags(rootCmd) var ports []int + var err error + var spaceDef1, spaceDef2 func() // first creating By("Starting node1 with data") - node1Dir, spaceDef1, err := test.NewSpace() + node1Dir, spaceDef1, err = test.NewSpace() Expect(err).NotTo(HaveOccurred()) ports, err = test.AllocateFreePorts(4) Expect(err).NotTo(HaveOccurred()) @@ -385,7 +397,7 @@ var _ = Describe("Property Cluster Operation", func() { node1Defer() By("Starting node2 with data") - node2Dir, spaceDef2, err := test.NewSpace() + node2Dir, spaceDef2, err = test.NewSpace() Expect(err).NotTo(HaveOccurred()) ports, err = test.AllocateFreePorts(4) Expect(err).NotTo(HaveOccurred()) @@ -412,17 +424,17 @@ var _ = Describe("Property Cluster Operation", func() { Expect(err).ShouldNot(HaveOccurred()) <-server.ReadyNotify() By("Starting data node 0") - closeDataNode0 := setup.DataNodeFromDataDir(ep, node1Dir) + closeNode1 = setup.DataNodeFromDataDir(ep, node1Dir) By("Starting data node 1") - closeDataNode1 := setup.DataNodeFromDataDir(ep, node2Dir) + closeNode2 = setup.DataNodeFromDataDir(ep, node2Dir) By("Starting liaison node") _, liaisonHTTPAddr, closerLiaisonNode := setup.LiaisonNodeWithHTTP(ep) By("Initializing test cases") deferFunc = func() { closerLiaisonNode() - closeDataNode0() - closeDataNode1() + closeNode1() + closeNode2() _ = server.Close() <-server.StopNotify() spaceDef() @@ -442,6 +454,34 @@ var _ = Describe("Property Cluster Operation", func() { queryData(rootCmd, addr, propertyGroup, 1, func(data string, resp *propertyv1.QueryResponse) { Expect(data).Should(ContainSubstring("foo333")) }) + closeNode1() + closeNode2() + + // check there should have two real properties in the dest database + // and one of them should be deleted (marked in the query phase) + store1, err := generateInvertedStore(node1Dir) + Expect(err).NotTo(HaveOccurred()) + store2, err := generateInvertedStore(node2Dir) + Expect(err).NotTo(HaveOccurred()) + + query, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{ + Groups: []string{propertyGroup}, + }, "_group", "_entity_id") + Expect(err).NotTo(HaveOccurred()) + node1Search, err := store1.Search(context.Background(), []index.FieldKey{deletedFieldKey}, query, 10) + Expect(err).NotTo(HaveOccurred()) + node2Search, err := store2.Search(context.Background(), []index.FieldKey{deletedFieldKey}, query, 10) + Expect(err).NotTo(HaveOccurred()) + + totalProperties := append(node1Search, node2Search...) + Expect(len(totalProperties)).Should(Equal(2)) + deletedCount := 0 + for _, p := range totalProperties { + if convert.BytesToBool(p.Fields[deletedFieldKey.TagName]) { + deletedCount++ + } + } + Expect(deletedCount).Should(Equal(1)) }) It("delete property", func() { @@ -542,3 +582,28 @@ func queryData(rootCmd *cobra.Command, addr, group string, dataCount int, verify return nil }, flags.EventuallyTimeout).Should(Succeed()) } + +func generateInvertedStore(rootPath string) (index.SeriesStore, error) { + shardParent := path.Join(rootPath, "property", "data") + list, err := os.ReadDir(shardParent) + if err != nil { + return nil, fmt.Errorf("read dir %s error: %w", shardParent, err) + } + if len(list) == 0 { + return nil, fmt.Errorf("no shard found in %s", shardParent) + } + for _, e := range list { + if !e.Type().IsDir() { + continue + } + _, found := strings.CutPrefix(e.Name(), "shard-") + if !found { + continue + } + return inverted.NewStore( + inverted.StoreOpts{ + Path: path.Join(shardParent, e.Name()), + }) + } + return nil, fmt.Errorf("no shard found in %s", rootPath) +}