hanahmily commented on code in PR #684: URL: https://github.com/apache/skywalking-banyandb/pull/684#discussion_r2157182459
########## banyand/property/shard.go: ########## @@ -225,3 +232,45 @@ func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, } return data, nil } + +func (s *shard) prepareForMerge(src []*roaring.Bitmap, segments []segment.Segment, _ uint64) (dest []*roaring.Bitmap, err error) { + if len(segments) == 0 || len(src) == 0 || len(segments) != len(src) { + return src, nil + } + for segID, seg := range segments { + var docID uint64 + for ; docID < seg.Count(); docID++ { + hasDeleted := false + var sourceData []byte + _ = seg.VisitStoredFields(docID, func(field string, value []byte) bool { + if field == deleteField { + hasDeleted = convert.BytesToBool(value) Review Comment: Please include the deletion time for this field. By doing so, there’s no need to parse the source field. ########## banyand/property/shard.go: ########## @@ -225,3 +232,45 @@ func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, } return data, nil } + +func (s *shard) prepareForMerge(src []*roaring.Bitmap, segments []segment.Segment, _ uint64) (dest []*roaring.Bitmap, err error) { Review Comment: If there is no error returned, why was an error return value added? ########## banyand/property/shard_test.go: ########## @@ -0,0 +1,113 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "testing" + "time" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/test" +) + +func TestMergeDeleted(t *testing.T) { Review Comment: Could you please add a new case to verify the properties that have been deleted but are still unexpired? ########## banyand/property/shard.go: ########## @@ -225,3 +232,45 @@ func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, } return data, nil } + +func (s *shard) prepareForMerge(src []*roaring.Bitmap, segments []segment.Segment, _ uint64) (dest []*roaring.Bitmap, err error) { + if len(segments) == 0 || len(src) == 0 || len(segments) != len(src) { + return src, nil + } + for segID, seg := range segments { + var docID uint64 + for ; docID < seg.Count(); docID++ { + hasDeleted := false + var sourceData []byte + _ = seg.VisitStoredFields(docID, func(field string, value []byte) bool { Review Comment: I want to recommend that you handle the error. ########## banyand/property/shard.go: ########## @@ -225,3 +232,45 @@ func (s *shard) search(ctx context.Context, indexQuery index.Query, limit int, } return data, nil } + +func (s *shard) prepareForMerge(src []*roaring.Bitmap, segments []segment.Segment, _ uint64) (dest []*roaring.Bitmap, err error) { + if len(segments) == 0 || len(src) == 0 || len(segments) != len(src) { + return src, nil + } + for segID, seg := range segments { + var docID uint64 + for ; docID < seg.Count(); docID++ { + hasDeleted := false + var sourceData []byte + _ = seg.VisitStoredFields(docID, func(field string, value []byte) bool { + if field == deleteField { + hasDeleted = convert.BytesToBool(value) + } else if field == sourceField { + sourceData = value + } + return true + }) + + if !hasDeleted || sourceData == nil { + continue + } + + // parsing the property data to check the data is expired or not + var p propertyv1.Property + if err := protojson.Unmarshal(sourceData, &p); err != nil { + s.l.Warn().Msgf("unmarshal property failure when merging segments %d: %v", segID, err) + continue + } + if time.Now().Unix()-p.Metadata.ModRevision < s.expireToDeleteSec { Review Comment: The `p.Metadata.ModRevision` represents the time in nanoseconds ########## banyand/property/shard_test.go: ########## @@ -0,0 +1,113 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package property + +import ( + "context" + "fmt" + "testing" + "time" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/test" +) + +func TestMergeDeleted(t *testing.T) { + var defers []func() + defer func() { + for _, f := range defers { + f() + } + }() + + dir, deferFunc, err := test.NewSpace() + if err != nil { + t.Fatal(err) + } + defers = append(defers, deferFunc) + db, err := openDB(context.Background(), dir, 3*time.Second, time.Second, observability.BypassRegistry, fs.NewLocalFileSystem()) + if err != nil { + t.Fatal(err) + } + defers = append(defers, func() { + _ = db.close() + }) + + newShard, err := db.loadShard(context.Background(), 0) + if err != nil { + t.Fatal(err) + } + + propertyCount := 6 + + properties := make([]*propertyv1.Property, 0, propertyCount) + unix := time.Now().Unix() + unix -= 10 + for i := 0; i < propertyCount; i++ { + property := &propertyv1.Property{ + Metadata: &commonv1.Metadata{ + Group: "test-group", + Name: "test-name", + ModRevision: unix, + }, + Id: fmt.Sprintf("test-id%d", i), + Tags: []*modelv1.Tag{ + {Key: "tag1", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: int64(i)}}}}, + }, + } + properties = append(properties, property) + } + + // apply the property + for _, p := range properties { + if err = newShard.update(GetPropertyID(p), p); err != nil { + t.Fatal(err) + } + } + Review Comment: Add a step to verify the update by querying all properties. The returned value should match the inserted volume. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@skywalking.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org