Copilot commented on code in PR #955:
URL:
https://github.com/apache/skywalking-banyandb/pull/955#discussion_r2715180526
##########
CHANGES.md:
##########
@@ -18,6 +18,9 @@ Release Notes.
- Update the dump tool to support analyzing the parts with smeta files.
- Activate the property repair mechanism by default.
- Add snapshot time retention policy to ensure the snapshot only can be
deleted after the configured minimum age(time).
+- **Breaking Change**: Change the data storage path structure for property
model:
+ - From: `<data-dir>/property/<shard>/...`
+ - To: `<data-dir>/property/group/<shard>/...`
Review Comment:
The new on-disk layout implemented by the code is
`<data-dir>/property/<group>/<shard>/...` (group name as a directory). The
CHANGES entry currently says `<data-dir>/property/group/<shard>/...`, which
reads like a literal `group` directory and is likely misleading—please update
the text to match the actual layout (and consider including the `shard-<id>`
naming).
```suggestion
- From: `<data-dir>/property/shard-<id>/...`
- To: `<data-dir>/property/<group>/shard-<id>/...`
```
##########
banyand/property/db.go:
##########
@@ -216,34 +248,87 @@ func (db *database) query(ctx context.Context, req
*propertyv1.QueryRequest) ([]
return result, nil
}
-func (db *database) loadShard(ctx context.Context, id common.ShardID) (*shard,
error) {
+func (db *database) collectGroupShards(groupsMap *map[string]*groupShards,
requestedGroups map[string]bool) []*shard {
+ var shards []*shard
+ for groupName, gs := range *groupsMap {
+ if len(requestedGroups) > 0 {
+ if _, ok := requestedGroups[groupName]; !ok {
+ continue
+ }
+ }
+ sLst := gs.shards.Load()
+ if sLst == nil {
+ continue
+ }
+ shards = append(shards, *sLst...)
+ }
+ return shards
+}
+
+func (db *database) loadShard(ctx context.Context, group string, id
common.ShardID) (*shard, error) {
if db.closed.Load() {
return nil, errors.New("database is closed")
}
- if s, ok := db.getShard(id); ok {
+ if s, ok := db.getShard(group, id); ok {
return s, nil
}
db.mu.Lock()
defer db.mu.Unlock()
- if s, ok := db.getShard(id); ok {
+ if s, ok := db.getShard(group, id); ok {
return s, nil
}
- sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey,
db.logger), id, int64(db.flushInterval.Seconds()),
+
+ gs := db.getOrCreateGroupShards(group)
+ sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey,
db.logger),
+ group, id, int64(db.flushInterval.Seconds()),
int64(db.expireDelete.Seconds()), db.repairBaseDir,
db.repairTreeSlotCount)
if err != nil {
return nil, err
}
- sLst := db.sLst.Load()
+
+ gs.mu.Lock()
+ sLst := gs.shards.Load()
if sLst == nil {
sLst = &[]*shard{}
}
- *sLst = append(*sLst, sd)
- db.sLst.Store(sLst)
+ newList := append(*sLst, sd)
+ gs.shards.Store(&newList)
Review Comment:
`newList := append(*sLst, sd)` can reuse the backing array of `*sLst` and
mutate it in-place. Since readers access `gs.shards.Load()` without taking
`gs.mu`, this can cause data races / inconsistent reads. Build the new slice
with a guaranteed new backing array (e.g., copy to a fresh slice before
appending) before storing it.
##########
banyand/property/repair.go:
##########
@@ -710,31 +686,30 @@ func (r *repairTreeComposer) append(id, shaVal string)
(err error) {
return file.append(idBytes, shaValBytes)
}
-// composeNextGroupAndSave composes the current group of slot files into a
repair tree file.
+// composeAndSave composes the slot files into a repair tree file.
// tree file format: [leaf nodes]+[slot nodes]+[root node]+[metadata]
// leaf nodes: each node contains: [entity]+[sha value]
// slot nodes: each node contains: [slot index]+[sha value]+[leaf nodes start
offset]+[leaf nodes count]
// root node: contains [sha value]
// metadata: contains footer([slot nodes start offset]+[slot nodes
count]+[root node start offset]+[root node length])+[footer length(data binary)]
-func (r *repairTreeComposer) composeNextGroupAndSave(group string) (err error)
{
- treeFilePath := fmt.Sprintf(r.treeFileFmt, group)
- treeBuilder, err := newRepairTreeBuilder(treeFilePath)
+func (r *repairTreeComposer) composeAndSave() (err error) {
+ treeBuilder, err := newRepairTreeBuilder(r.treeFilePath)
if err != nil {
- return fmt.Errorf("creating repair tree builder for group %s
failure: %w", group, err)
+ return fmt.Errorf("creating repair tree builder failure: %w",
err)
}
defer func() {
if closeErr := treeBuilder.close(); closeErr != nil {
- err = multierr.Append(err, fmt.Errorf("closing repair
tree builder for group %s failure: %w", group, closeErr))
+ err = multierr.Append(err, fmt.Errorf("closing repair
tree builder failure: %w", closeErr))
}
Review Comment:
With a single `state-tree.data` file being overwritten, make sure the tree
file is truncated/rewritten atomically. `newRepairTreeBuilder` currently opens
the file without `O_TRUNC` (and doesn’t call `Truncate(0)`), so rebuilding a
smaller tree can leave stale bytes at the end and cause `treeReader()` to read
the old footer. Fix by truncating/removing the file before writing (or opening
with `os.O_TRUNC`).
##########
banyand/property/db.go:
##########
@@ -216,34 +248,87 @@ func (db *database) query(ctx context.Context, req
*propertyv1.QueryRequest) ([]
return result, nil
}
-func (db *database) loadShard(ctx context.Context, id common.ShardID) (*shard,
error) {
+func (db *database) collectGroupShards(groupsMap *map[string]*groupShards,
requestedGroups map[string]bool) []*shard {
+ var shards []*shard
+ for groupName, gs := range *groupsMap {
+ if len(requestedGroups) > 0 {
+ if _, ok := requestedGroups[groupName]; !ok {
+ continue
+ }
+ }
+ sLst := gs.shards.Load()
+ if sLst == nil {
+ continue
+ }
+ shards = append(shards, *sLst...)
+ }
+ return shards
+}
+
+func (db *database) loadShard(ctx context.Context, group string, id
common.ShardID) (*shard, error) {
if db.closed.Load() {
return nil, errors.New("database is closed")
}
- if s, ok := db.getShard(id); ok {
+ if s, ok := db.getShard(group, id); ok {
return s, nil
}
db.mu.Lock()
defer db.mu.Unlock()
- if s, ok := db.getShard(id); ok {
+ if s, ok := db.getShard(group, id); ok {
return s, nil
}
- sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey,
db.logger), id, int64(db.flushInterval.Seconds()),
+
+ gs := db.getOrCreateGroupShards(group)
+ sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey,
db.logger),
+ group, id, int64(db.flushInterval.Seconds()),
int64(db.expireDelete.Seconds()), db.repairBaseDir,
db.repairTreeSlotCount)
if err != nil {
return nil, err
}
- sLst := db.sLst.Load()
+
+ gs.mu.Lock()
+ sLst := gs.shards.Load()
if sLst == nil {
sLst = &[]*shard{}
}
- *sLst = append(*sLst, sd)
- db.sLst.Store(sLst)
+ newList := append(*sLst, sd)
+ gs.shards.Store(&newList)
+ gs.mu.Unlock()
return sd, nil
}
-func (db *database) getShard(id common.ShardID) (*shard, bool) {
- sLst := db.sLst.Load()
+func (db *database) getOrCreateGroupShards(group string) *groupShards {
+ groupsMap := db.groups.Load()
+ if groupsMap != nil {
+ if gs, ok := (*groupsMap)[group]; ok {
+ return gs
+ }
+ }
+ gs := &groupShards{
+ group: group,
+ location: filepath.Join(db.location, group),
+ }
+ if groupsMap == nil {
+ newMap := make(map[string]*groupShards)
+ newMap[group] = gs
+ db.groups.Store(&newMap)
+ } else {
+ (*groupsMap)[group] = gs
+ db.groups.Store(groupsMap)
Review Comment:
`getOrCreateGroupShards` mutates the existing `map[string]*groupShards`
in-place (`(*groupsMap)[group] = gs`) and then re-stores the same map pointer.
Callers like `delete/query/collect/close` iterate over `*groupsMap` without
holding `db.mu`, so this can trigger `concurrent map iteration and map write`
panics. Use copy-on-write (allocate a new map, copy entries, add the new group,
then `Store` the new pointer) or protect all reads with the same lock used for
writes.
```suggestion
// Use copy-on-write: create a new map, copy existing entries,
then add the new group.
oldMap := *groupsMap
newMap := make(map[string]*groupShards, len(oldMap)+1)
for k, v := range oldMap {
newMap[k] = v
}
newMap[group] = gs
db.groups.Store(&newMap)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]