This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 5e585bcd Change the data storage path structure for property model 
(#955)
5e585bcd is described below

commit 5e585bcdcbe8c01275f91dc64f7a52c03369aab4
Author: mrproliu <[email protected]>
AuthorDate: Thu Jan 22 14:45:24 2026 +0800

    Change the data storage path structure for property model (#955)
    
    * Change the data storage path structure for the property model
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
    Co-authored-by: Copilot <[email protected]>
---
 CHANGES.md                             |   3 +
 banyand/property/db.go                 | 164 ++++++++++++++++++++++--------
 banyand/property/listener.go           |  58 +++++++----
 banyand/property/repair.go             | 177 ++++++++++++++++-----------------
 banyand/property/repair_gossip.go      |  24 ++---
 banyand/property/repair_gossip_test.go |  37 ++++---
 banyand/property/repair_test.go        |  51 ++++------
 banyand/property/shard.go              |  20 ++--
 banyand/property/shard_test.go         |   4 +-
 banyand/property/test_helper.go        |   2 +-
 bydbctl/internal/cmd/property_test.go  |  38 ++++---
 docs/api-reference.md                  |   2 +
 12 files changed, 340 insertions(+), 240 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index f7a9d528..8d8facb5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -19,6 +19,9 @@ Release Notes.
 - Add replication integration test for measure.
 - Activate the property repair mechanism by default.
 - Add snapshot time retention policy to ensure the snapshot only can be 
deleted after the configured minimum age(time).
+- **Breaking Change**: Change the data storage path structure for property 
model:
+  - From: `<data-dir>/property/data/shard-<id>/...`
+  - To: `<data-dir>/property/data/<group>/shard-<id>/...`
 
 ### Bug Fixes
 
diff --git a/banyand/property/db.go b/banyand/property/db.go
index 162a8ddc..105711b0 100644
--- a/banyand/property/db.go
+++ b/banyand/property/db.go
@@ -52,6 +52,13 @@ var (
        propertyScope = observability.RootScope.SubScope("property")
 )
 
+type groupShards struct {
+       shards   atomic.Pointer[[]*shard]
+       group    string
+       location string
+       mu       sync.RWMutex
+}
+
 type database struct {
        metadata            metadata.Repo
        omr                 observability.MetricsRegistry
@@ -59,7 +66,7 @@ type database struct {
        lock                fs.File
        logger              *logger.Logger
        repairScheduler     *repairScheduler
-       sLst                atomic.Pointer[[]*shard]
+       groups              sync.Map
        location            string
        repairBaseDir       string
        flushInterval       time.Duration
@@ -129,18 +136,29 @@ func (db *database) load(ctx context.Context) error {
        if db.closed.Load() {
                return errors.New("database is closed")
        }
-       return walkDir(db.location, "shard-", func(suffix string) error {
-               id, err := strconv.Atoi(suffix)
-               if err != nil {
-                       return err
+       for _, groupDir := range lfs.ReadDir(db.location) {
+               if !groupDir.IsDir() {
+                       continue
                }
-               _, err = db.loadShard(ctx, common.ShardID(id))
-               return err
-       })
+               groupName := groupDir.Name()
+               groupPath := filepath.Join(db.location, groupName)
+               walkErr := walkDir(groupPath, "shard-", func(suffix string) 
error {
+                       id, parseErr := strconv.Atoi(suffix)
+                       if parseErr != nil {
+                               return parseErr
+                       }
+                       _, loadErr := db.loadShard(ctx, groupName, 
common.ShardID(id))
+                       return loadErr
+               })
+               if walkErr != nil {
+                       return walkErr
+               }
+       }
+       return nil
 }
 
 func (db *database) update(ctx context.Context, shardID common.ShardID, id 
[]byte, property *propertyv1.Property) error {
-       sd, err := db.loadShard(ctx, shardID)
+       sd, err := db.loadShard(ctx, property.Metadata.Group, shardID)
        if err != nil {
                return err
        }
@@ -152,14 +170,18 @@ func (db *database) update(ctx context.Context, shardID 
common.ShardID, id []byt
 }
 
 func (db *database) delete(ctx context.Context, docIDs [][]byte) error {
-       sLst := db.sLst.Load()
-       if sLst == nil {
-               return nil
-       }
        var err error
-       for _, s := range *sLst {
-               multierr.AppendInto(&err, s.delete(ctx, docIDs))
-       }
+       db.groups.Range(func(_, value any) bool {
+               gs := value.(*groupShards)
+               sLst := gs.shards.Load()
+               if sLst == nil {
+                       return true
+               }
+               for _, s := range *sLst {
+                       multierr.AppendInto(&err, s.delete(ctx, docIDs))
+               }
+               return true
+       })
        return err
 }
 
@@ -168,14 +190,18 @@ func (db *database) query(ctx context.Context, req 
*propertyv1.QueryRequest) ([]
        if err != nil {
                return nil, err
        }
-       sLst := db.sLst.Load()
-       if sLst == nil {
+       requestedGroups := make(map[string]bool, len(req.Groups))
+       for _, g := range req.Groups {
+               requestedGroups[g] = true
+       }
+       shards := db.collectGroupShards(requestedGroups)
+       if len(shards) == 0 {
                return nil, nil
        }
 
        if req.OrderBy == nil {
                var res []*queryProperty
-               for _, s := range *sLst {
+               for _, s := range shards {
                        r, searchErr := s.search(ctx, iq, nil, int(req.Limit))
                        if searchErr != nil {
                                return nil, searchErr
@@ -185,8 +211,8 @@ func (db *database) query(ctx context.Context, req 
*propertyv1.QueryRequest) ([]
                return res, nil
        }
 
-       iters := make([]sort.Iterator[*queryProperty], 0, len(*sLst))
-       for _, s := range *sLst {
+       iters := make([]sort.Iterator[*queryProperty], 0, len(shards))
+       for _, s := range shards {
                // Each shard returns pre-sorted results (via SeriesSort)
                r, searchErr := s.search(ctx, iq, req.OrderBy, int(req.Limit))
                if searchErr != nil {
@@ -216,34 +242,77 @@ func (db *database) query(ctx context.Context, req 
*propertyv1.QueryRequest) ([]
        return result, nil
 }
 
-func (db *database) loadShard(ctx context.Context, id common.ShardID) (*shard, 
error) {
+func (db *database) collectGroupShards(requestedGroups map[string]bool) 
[]*shard {
+       var shards []*shard
+       db.groups.Range(func(key, value any) bool {
+               groupName := key.(string)
+               if len(requestedGroups) > 0 {
+                       if _, ok := requestedGroups[groupName]; !ok {
+                               return true
+                       }
+               }
+               gs := value.(*groupShards)
+               sLst := gs.shards.Load()
+               if sLst == nil {
+                       return true
+               }
+               shards = append(shards, *sLst...)
+               return true
+       })
+       return shards
+}
+
+func (db *database) loadShard(ctx context.Context, group string, id 
common.ShardID) (*shard, error) {
        if db.closed.Load() {
                return nil, errors.New("database is closed")
        }
-       if s, ok := db.getShard(id); ok {
+       if s, ok := db.getShard(group, id); ok {
                return s, nil
        }
        db.mu.Lock()
        defer db.mu.Unlock()
-       if s, ok := db.getShard(id); ok {
+       if s, ok := db.getShard(group, id); ok {
                return s, nil
        }
-       sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey, 
db.logger), id, int64(db.flushInterval.Seconds()),
+
+       gs := db.getOrCreateGroupShards(group)
+       sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey, 
db.logger),
+               group, id, int64(db.flushInterval.Seconds()),
                int64(db.expireDelete.Seconds()), db.repairBaseDir, 
db.repairTreeSlotCount)
        if err != nil {
                return nil, err
        }
-       sLst := db.sLst.Load()
-       if sLst == nil {
-               sLst = &[]*shard{}
+
+       gs.mu.Lock()
+       sLst := gs.shards.Load()
+       var oldList []*shard
+       if sLst != nil {
+               oldList = *sLst
        }
-       *sLst = append(*sLst, sd)
-       db.sLst.Store(sLst)
+       newList := make([]*shard, len(oldList)+1)
+       copy(newList, oldList)
+       newList[len(oldList)] = sd
+       gs.shards.Store(&newList)
+       gs.mu.Unlock()
        return sd, nil
 }
 
-func (db *database) getShard(id common.ShardID) (*shard, bool) {
-       sLst := db.sLst.Load()
+func (db *database) getOrCreateGroupShards(group string) *groupShards {
+       gs := &groupShards{
+               group:    group,
+               location: filepath.Join(db.location, group),
+       }
+       actual, _ := db.groups.LoadOrStore(group, gs)
+       return actual.(*groupShards)
+}
+
+func (db *database) getShard(group string, id common.ShardID) (*shard, bool) {
+       value, ok := db.groups.Load(group)
+       if !ok {
+               return nil, false
+       }
+       gs := value.(*groupShards)
+       sLst := gs.shards.Load()
        if sLst == nil {
                return nil, false
        }
@@ -262,13 +331,18 @@ func (db *database) close() error {
        if db.repairScheduler != nil {
                db.repairScheduler.close()
        }
-       sLst := db.sLst.Load()
        var err error
-       if sLst != nil {
+       db.groups.Range(func(_, value any) bool {
+               gs := value.(*groupShards)
+               sLst := gs.shards.Load()
+               if sLst == nil {
+                       return true
+               }
                for _, s := range *sLst {
                        multierr.AppendInto(&err, s.close())
                }
-       }
+               return true
+       })
        db.lock.Close()
        return err
 }
@@ -277,17 +351,21 @@ func (db *database) collect() {
        if db.closed.Load() {
                return
        }
-       sLst := db.sLst.Load()
-       if sLst == nil {
-               return
-       }
-       for _, s := range *sLst {
-               s.store.CollectMetrics()
-       }
+       db.groups.Range(func(_, value any) bool {
+               gs := value.(*groupShards)
+               sLst := gs.shards.Load()
+               if sLst == nil {
+                       return true
+               }
+               for _, s := range *sLst {
+                       s.store.CollectMetrics()
+               }
+               return true
+       })
 }
 
 func (db *database) repair(ctx context.Context, id []byte, shardID uint64, 
property *propertyv1.Property, deleteTime int64) error {
-       s, err := db.loadShard(ctx, common.ShardID(shardID))
+       s, err := db.loadShard(ctx, property.Metadata.Group, 
common.ShardID(shardID))
        if err != nil {
                return errors.WithMessagef(err, "failed to load shard %d", id)
        }
diff --git a/banyand/property/listener.go b/banyand/property/listener.go
index bfb13d03..a7985ed2 100644
--- a/banyand/property/listener.go
+++ b/banyand/property/listener.go
@@ -234,30 +234,46 @@ func (s *snapshotListener) Rev(ctx context.Context, 
message bus.Message) bus.Mes
        defer s.snapshotMux.Unlock()
        storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum, 
s.s.minFileSnapshotAge, s.s.lfs)
        sn := s.snapshotName()
-       shardsRef := s.s.db.sLst.Load()
-       if shardsRef == nil {
-               return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
-       }
-       shards := *shardsRef
-       for _, shard := range shards {
-               select {
-               case <-ctx.Done():
-                       return 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
-               default:
+       var snapshotResult *databasev1.Snapshot
+       s.s.db.groups.Range(func(_, value any) bool {
+               gs := value.(*groupShards)
+               sLst := gs.shards.Load()
+               if sLst == nil {
+                       return true
                }
-               snpDir := path.Join(s.s.snapshotDir, sn, storage.DataDir, 
filepath.Base(shard.location))
-               lfs.MkdirPanicIfExist(snpDir, storage.DirPerm)
-               err := shard.store.TakeFileSnapshot(snpDir)
-               if err != nil {
-                       s.s.l.Error().Err(err).Str("shard", 
filepath.Base(shard.location)).Msg("fail to take shard snapshot")
-                       return 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), &databasev1.Snapshot{
-                               Name:    sn,
-                               Catalog: commonv1.Catalog_CATALOG_PROPERTY,
-                               Error:   err.Error(),
-                       })
+               for _, shardRef := range *sLst {
+                       select {
+                       case <-ctx.Done():
+                               // Context canceled: record an error snapshot 
and stop iteration.
+                               if err := ctx.Err(); err != nil {
+                                       snapshotResult = &databasev1.Snapshot{
+                                               Name:    sn,
+                                               Catalog: 
commonv1.Catalog_CATALOG_PROPERTY,
+                                               Error:   err.Error(),
+                                       }
+                               }
+                               return false
+                       default:
+                       }
+                       snpDir := path.Join(s.s.snapshotDir, sn, 
storage.DataDir, shardRef.group, filepath.Base(shardRef.location))
+                       lfs.MkdirPanicIfExist(snpDir, storage.DirPerm)
+                       snapshotErr := shardRef.store.TakeFileSnapshot(snpDir)
+                       if snapshotErr != nil {
+                               s.s.l.Error().Err(snapshotErr).Str("group", 
shardRef.group).
+                                       Str("shard", 
filepath.Base(shardRef.location)).Msg("fail to take shard snapshot")
+                               snapshotResult = &databasev1.Snapshot{
+                                       Name:    sn,
+                                       Catalog: 
commonv1.Catalog_CATALOG_PROPERTY,
+                                       Error:   snapshotErr.Error(),
+                               }
+                               return false
+                       }
                }
+               return true
+       })
+       if snapshotResult != nil {
+               return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
snapshotResult)
        }
-
        return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
&databasev1.Snapshot{
                Name:    sn,
                Catalog: commonv1.Catalog_CATALOG_PROPERTY,
diff --git a/banyand/property/repair.go b/banyand/property/repair.go
index 381ab1f4..b5d141b3 100644
--- a/banyand/property/repair.go
+++ b/banyand/property/repair.go
@@ -73,7 +73,7 @@ type repair struct {
        snapshotDir               string
        statePath                 string
        composeSlotAppendFilePath string
-       composeTreeFilePathFmt    string
+       composeTreeFilePath       string
        treeSlotCount             int
        batchSearchSize           int
 }
@@ -94,7 +94,7 @@ func newRepair(
                snapshotDir:               path.Join(repairPath, "snapshot"),
                statePath:                 path.Join(repairPath, "state.json"),
                composeSlotAppendFilePath: path.Join(repairPath, 
"state-append-%d.tmp"),
-               composeTreeFilePathFmt:    path.Join(repairPath, 
"state-tree-%s.data"),
+               composeTreeFilePath:       path.Join(repairPath, 
"state-tree.data"),
                batchSearchSize:           batchSearchSize,
                metrics:                   newRepairMetrics(metricsFactory),
                scheduler:                 scheduler,
@@ -124,8 +124,8 @@ func (r *repair) checkHasUpdates() (bool, error) {
        return true, nil
 }
 
-func (r *repair) buildStatus(ctx context.Context, snapshotPath string, group 
string) (err error) {
-       r.l.Debug().Msgf("starting building status from snapshot path %s, 
group: %s", snapshotPath, group)
+func (r *repair) buildStatus(ctx context.Context, snapshotPath string) (err 
error) {
+       r.l.Debug().Msgf("starting building status from snapshot path %s", 
snapshotPath)
        startTime := time.Now()
        defer func() {
                r.metrics.totalBuildTreeFinished.Inc(1)
@@ -142,15 +142,10 @@ func (r *repair) buildStatus(ctx context.Context, 
snapshotPath string, group str
        sort.Sort(snapshotIDList(items))
 
        blugeConf := bluge.DefaultConfig(snapshotPath)
-       err = r.buildTree(ctx, blugeConf, group)
+       err = r.buildTree(ctx, blugeConf)
        if err != nil {
                return fmt.Errorf("building trees failure: %w", err)
        }
-       // if only update a specific group, the repair base status file doesn't 
need to update
-       // because not all the group have been processed
-       if group != "" {
-               return nil
-       }
 
        var latestSnapshotID uint64
        if len(items) > 0 {
@@ -175,7 +170,7 @@ func (r *repair) buildStatus(ctx context.Context, 
snapshotPath string, group str
        return nil
 }
 
-func (r *repair) buildTree(ctx context.Context, conf bluge.Config, group 
string) error {
+func (r *repair) buildTree(ctx context.Context, conf bluge.Config) error {
        reader, err := bluge.OpenReader(conf)
        if err != nil {
                // means no data found
@@ -188,9 +183,6 @@ func (r *repair) buildTree(ctx context.Context, conf 
bluge.Config, group string)
                _ = reader.Close()
        }()
        query := bluge.Query(bluge.NewMatchAllQuery())
-       if group != "" {
-               query = bluge.NewTermQuery(group).SetField(groupField)
-       }
        topNSearch := bluge.NewTopNSearch(r.batchSearchSize, query)
        topNSearch.SortBy([]string{
                fmt.Sprintf("+%s", groupField),
@@ -200,32 +192,19 @@ func (r *repair) buildTree(ctx context.Context, conf 
bluge.Config, group string)
        })
 
        var latestProperty *searchingProperty
-       treeComposer := newRepairTreeComposer(r.composeSlotAppendFilePath, 
r.composeTreeFilePathFmt, r.treeSlotCount, r.l)
+       treeComposer := newRepairTreeComposer(r.composeSlotAppendFilePath, 
r.composeTreeFilePath, r.treeSlotCount, r.l)
        err = r.pageSearch(ctx, reader, topNSearch, func(sortValue [][]byte, 
shaValue string) error {
                if len(sortValue) != 4 {
                        return fmt.Errorf("unexpected sort value length: %d", 
len(sortValue))
                }
-               group := convert.BytesToString(sortValue[0])
+               groupName := convert.BytesToString(sortValue[0])
                name := convert.BytesToString(sortValue[1])
-               entity := r.buildLeafNodeEntity(group, name, 
convert.BytesToString(sortValue[2]))
-
-               s := newSearchingProperty(group, shaValue, entity)
-               if latestProperty != nil {
-                       if latestProperty.group != group {
-                               // if the group have changed, we need to append 
the latest property to the tree composer, and compose builder
-                               // the entity is changed, need to save the 
property
-                               if err = 
treeComposer.append(latestProperty.entityID, latestProperty.shaValue); err != 
nil {
-                                       return fmt.Errorf("appending property 
to tree composer failure: %w", err)
-                               }
-                               err = 
treeComposer.composeNextGroupAndSave(latestProperty.group)
-                               if err != nil {
-                                       return fmt.Errorf("composing group 
failure: %w", err)
-                               }
-                       } else if latestProperty.entityID != entity {
-                               // the entity is changed, need to save the 
property
-                               if err = 
treeComposer.append(latestProperty.entityID, latestProperty.shaValue); err != 
nil {
-                                       return fmt.Errorf("appending property 
to tree composer failure: %w", err)
-                               }
+               entity := r.buildLeafNodeEntity(groupName, name, 
convert.BytesToString(sortValue[2]))
+
+               s := newSearchingProperty(groupName, shaValue, entity)
+               if latestProperty != nil && latestProperty.entityID != entity {
+                       if appendErr := 
treeComposer.append(latestProperty.entityID, latestProperty.shaValue); 
appendErr != nil {
+                               return fmt.Errorf("appending property to tree 
composer failure: %w", appendErr)
                        }
                }
                latestProperty = s
@@ -239,9 +218,8 @@ func (r *repair) buildTree(ctx context.Context, conf 
bluge.Config, group string)
                if err = treeComposer.append(latestProperty.entityID, 
latestProperty.shaValue); err != nil {
                        return fmt.Errorf("appending latest property to tree 
composer failure: %w", err)
                }
-               err = treeComposer.composeNextGroupAndSave(latestProperty.group)
-               if err != nil {
-                       return fmt.Errorf("composing last group failure: %w", 
err)
+               if err = treeComposer.composeAndSave(); err != nil {
+                       return fmt.Errorf("composing tree failure: %w", err)
                }
        }
 
@@ -371,17 +349,15 @@ type repairTreeFileReader struct {
        paging *repairTreeReaderPage
 }
 
-func (r *repair) treeReader(group string) (repairTreeReader, error) {
+func (r *repair) treeReader() (repairTreeReader, error) {
        r.scheduler.treeLocker.RLock()
        defer r.scheduler.treeLocker.RUnlock()
-       groupFile := fmt.Sprintf(r.composeTreeFilePathFmt, group)
-       file, err := os.OpenFile(groupFile, os.O_RDONLY, os.ModePerm)
+       file, err := os.OpenFile(r.composeTreeFilePath, os.O_RDONLY, 
os.ModePerm)
        if err != nil {
                if errors.Is(err, os.ErrNotExist) {
-                       // if the file does not exist, it means no repair tree 
for this group
                        return nil, nil
                }
-               return nil, fmt.Errorf("opening repair tree file %s failure: 
%w", group, err)
+               return nil, fmt.Errorf("opening repair tree file failure: %w", 
err)
        }
        reader := &repairTreeFileReader{
                file:   file,
@@ -389,7 +365,7 @@ func (r *repair) treeReader(group string) 
(repairTreeReader, error) {
        }
        if err = reader.readFoot(); err != nil {
                _ = file.Close()
-               return nil, fmt.Errorf("reading footer from repair tree file %s 
failure: %w", groupFile, err)
+               return nil, fmt.Errorf("reading footer from repair tree file 
failure: %w", err)
        }
        return reader, nil
 }
@@ -679,15 +655,15 @@ func (s snapshotIDList) Swap(i, j int)      { s[i], s[j] 
= s[j], s[i] }
 type repairTreeComposer struct {
        l             *logger.Logger
        appendFileFmt string
-       treeFileFmt   string
+       treeFilePath  string
        slotFiles     []*repairSlotFile
        slotCount     int
 }
 
-func newRepairTreeComposer(appendSlotFilePathFmt, treeFilePathFmt string, 
slotCount int, l *logger.Logger) *repairTreeComposer {
+func newRepairTreeComposer(appendSlotFilePathFmt, treeFilePath string, 
slotCount int, l *logger.Logger) *repairTreeComposer {
        return &repairTreeComposer{
                appendFileFmt: appendSlotFilePathFmt,
-               treeFileFmt:   treeFilePathFmt,
+               treeFilePath:  treeFilePath,
                slotCount:     slotCount,
                slotFiles:     make([]*repairSlotFile, slotCount),
                l:             l,
@@ -710,31 +686,30 @@ func (r *repairTreeComposer) append(id, shaVal string) 
(err error) {
        return file.append(idBytes, shaValBytes)
 }
 
-// composeNextGroupAndSave composes the current group of slot files into a 
repair tree file.
+// composeAndSave composes the slot files into a repair tree file.
 // tree file format: [leaf nodes]+[slot nodes]+[root node]+[metadata]
 // leaf nodes: each node contains: [entity]+[sha value]
 // slot nodes: each node contains: [slot index]+[sha value]+[leaf nodes start 
offset]+[leaf nodes count]
 // root node: contains [sha value]
 // metadata: contains footer([slot nodes start offset]+[slot nodes 
count]+[root node start offset]+[root node length])+[footer length(data binary)]
-func (r *repairTreeComposer) composeNextGroupAndSave(group string) (err error) 
{
-       treeFilePath := fmt.Sprintf(r.treeFileFmt, group)
-       treeBuilder, err := newRepairTreeBuilder(treeFilePath)
+func (r *repairTreeComposer) composeAndSave() (err error) {
+       treeBuilder, err := newRepairTreeBuilder(r.treeFilePath)
        if err != nil {
-               return fmt.Errorf("creating repair tree builder for group %s 
failure: %w", group, err)
+               return fmt.Errorf("creating repair tree builder failure: %w", 
err)
        }
        defer func() {
                if closeErr := treeBuilder.close(); closeErr != nil {
-                       err = multierr.Append(err, fmt.Errorf("closing repair 
tree builder for group %s failure: %w", group, closeErr))
+                       err = multierr.Append(err, fmt.Errorf("closing repair 
tree builder failure: %w", closeErr))
                }
        }()
-       for i, f := range r.slotFiles {
+       for idx, f := range r.slotFiles {
                if f == nil {
                        continue
                }
                if err = treeBuilder.appendSlot(f); err != nil {
-                       return fmt.Errorf("appending slot file %s to repair 
tree builder for group %s failure: %w", f.path, group, err)
+                       return fmt.Errorf("appending slot file %s to repair 
tree builder failure: %w", f.path, err)
                }
-               r.slotFiles[i] = nil
+               r.slotFiles[idx] = nil
        }
        return treeBuilder.build()
 }
@@ -1132,19 +1107,27 @@ func (r *repairScheduler) doBuildTree() (err error) {
                        r.l.Err(saveStatusErr).Msgf("saving repair build tree 
status failure")
                }
        }()
-       sLst := r.db.sLst.Load()
-       if sLst == nil {
-               return nil
-       }
        hasUpdates := false
-       for _, s := range *sLst {
-               hasUpdates, err = s.repairState.checkHasUpdates()
-               if err != nil {
-                       return err
+       var checkErr error
+       r.db.groups.Range(func(_, value any) bool {
+               gs := value.(*groupShards)
+               sLst := gs.shards.Load()
+               if sLst == nil {
+                       return true
                }
-               if hasUpdates {
-                       break
+               for _, s := range *sLst {
+                       hasUpdates, checkErr = s.repairState.checkHasUpdates()
+                       if checkErr != nil {
+                               return false
+                       }
+                       if hasUpdates {
+                               return false
+                       }
                }
+               return true
+       })
+       if checkErr != nil {
+               return checkErr
        }
        // if no updates, skip the repair
        if !hasUpdates {
@@ -1166,41 +1149,53 @@ func (r *repairScheduler) buildingTree(shards 
[]common.ShardID, group string, fo
        }
        defer r.treeLocker.Unlock()
 
-       buildAll := len(shards) == 0
-
-       // take a new snapshot first
        snapshotPath, err := r.buildSnapshotFunc(r.closer.Ctx())
        if err != nil {
                return fmt.Errorf("taking snapshot failure: %w", err)
        }
-       return walkDir(snapshotPath, "shard-", func(suffix string) error {
-               id, err := strconv.Atoi(suffix)
-               if err != nil {
-                       return err
+
+       for _, groupDir := range lfs.ReadDir(snapshotPath) {
+               if !groupDir.IsDir() {
+                       continue
                }
-               if !buildAll {
+               groupName := groupDir.Name()
+               if group != "" && groupName != group {
+                       continue
+               }
+               groupPath := path.Join(snapshotPath, groupName)
+               walkErr := walkDir(groupPath, "shard-", func(suffix string) 
error {
+                       id, parseErr := strconv.Atoi(suffix)
+                       if parseErr != nil {
+                               return parseErr
+                       }
                        // if not building all shards, check if the shard is in 
the list
-                       found := false
-                       for _, s := range shards {
-                               if s == common.ShardID(id) {
-                                       found = true
-                                       break
+                       if len(shards) > 0 {
+                               found := false
+                               for _, s := range shards {
+                                       if s == common.ShardID(id) {
+                                               found = true
+                                               break
+                                       }
+                               }
+                               if !found {
+                                       return nil // skip this shard
                                }
                        }
-                       if !found {
-                               return nil // skip this shard
+                       s, loadErr := r.db.loadShard(r.closer.Ctx(), groupName, 
common.ShardID(id))
+                       if loadErr != nil {
+                               return fmt.Errorf("loading shard %d failure: 
%w", id, loadErr)
                        }
+                       buildErr := s.repairState.buildStatus(r.closer.Ctx(), 
path.Join(groupPath, fmt.Sprintf("shard-%s", suffix)))
+                       if buildErr != nil {
+                               return fmt.Errorf("building status for shard %d 
failure: %w", id, buildErr)
+                       }
+                       return nil
+               })
+               if walkErr != nil {
+                       return walkErr
                }
-               s, err := r.db.loadShard(r.closer.Ctx(), common.ShardID(id))
-               if err != nil {
-                       return fmt.Errorf("loading shard %d failure: %w", id, 
err)
-               }
-               err = s.repairState.buildStatus(r.closer.Ctx(), 
path.Join(snapshotPath, fmt.Sprintf("shard-%s", suffix)), group)
-               if err != nil {
-                       return fmt.Errorf("building status for shard %d 
failure: %w", id, err)
-               }
-               return nil
-       })
+       }
+       return nil
 }
 
 func (r *repairScheduler) documentUpdatesNotify() {
diff --git a/banyand/property/repair_gossip.go 
b/banyand/property/repair_gossip.go
index e48d0f56..1f4af6a4 100644
--- a/banyand/property/repair_gossip.go
+++ b/banyand/property/repair_gossip.go
@@ -43,25 +43,25 @@ type repairGossipBase struct {
 }
 
 func (b *repairGossipBase) getTreeReader(ctx context.Context, group string, 
shardID uint32) (repairTreeReader, bool, error) {
-       s, err := b.scheduler.db.loadShard(ctx, common.ShardID(shardID))
+       s, err := b.scheduler.db.loadShard(ctx, group, common.ShardID(shardID))
        if err != nil {
                return nil, false, fmt.Errorf("failed to load shard %d: %w", 
shardID, err)
        }
-       tree, err := s.repairState.treeReader(group)
+       tree, err := s.repairState.treeReader()
        if err != nil {
                return nil, false, fmt.Errorf("failed to get tree reader for 
group %s: %w", group, err)
        }
        if tree == nil {
                // if the tree is nil, but the state file exist, means the 
tree(group) is empty
-               stateExist, err := s.repairState.stateFileExist()
-               if err != nil {
-                       return nil, false, fmt.Errorf("failed to check state 
file existence for group %s: %w", group, err)
+               stateExist, stateErr := s.repairState.stateFileExist()
+               if stateErr != nil {
+                       return nil, false, fmt.Errorf("failed to check state 
file existence for group %s: %w", group, stateErr)
                }
                if !stateExist {
                        // check has scheduled or not
-                       stateExist, err = b.scheduler.checkHasBuildTree()
-                       if err != nil {
-                               return nil, false, fmt.Errorf("failed to check 
if the tree state file exists: %w", err)
+                       stateExist, stateErr = b.scheduler.checkHasBuildTree()
+                       if stateErr != nil {
+                               return nil, false, fmt.Errorf("failed to check 
if the tree state file exists: %w", stateErr)
                        }
                }
                // if the tree is nil, it means the tree is no data
@@ -256,9 +256,9 @@ func (r *repairGossipClient) Rev(ctx context.Context, 
tracer gossip.Trace, nextN
                return nil
        }
 
-       syncShard, err := r.scheduler.db.loadShard(ctx, 
common.ShardID(request.ShardId))
-       if err != nil {
-               return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load 
failure on client side: %v", request.ShardId, err)
+       syncShard, loadShardErr := r.scheduler.db.loadShard(ctx, request.Group, 
common.ShardID(request.ShardId))
+       if loadShardErr != nil {
+               return errors.Wrapf(gossip.ErrAbortPropagation, "shard %d load 
failure on client side: %v", request.ShardId, loadShardErr)
        }
        firstTreeSummaryResp := true
 
@@ -924,7 +924,7 @@ func (r *repairGossipServer) recvMsgAndWaitReadNextDiffer(
                        }
                        return fmt.Errorf("failed to receive missing or sync 
request: %w", err)
                }
-               syncShard, err := r.scheduler.db.loadShard(s.Context(), 
common.ShardID(shardID))
+               syncShard, err := r.scheduler.db.loadShard(s.Context(), group, 
common.ShardID(shardID))
                if err != nil {
                        return fmt.Errorf("shard %d load failure on server 
side: %w", shardID, err)
                }
diff --git a/banyand/property/repair_gossip_test.go 
b/banyand/property/repair_gossip_test.go
index d9923a50..68b4db09 100644
--- a/banyand/property/repair_gossip_test.go
+++ b/banyand/property/repair_gossip_test.go
@@ -406,15 +406,22 @@ func startEachNode(ctrl *gomock.Controller, node node, 
groups []group) *nodeCont
                                return "", newSpaceErr
                        }
                        result.appendStop(defFunc)
-                       sList := db.sLst.Load()
                        var snpError error
-                       for _, s := range *sList {
-                               snpDir := path.Join(snapshotDir, 
filepath.Base(s.location))
-                               lfs.MkdirPanicIfExist(snpDir, storage.DirPerm)
-                               if e := s.store.TakeFileSnapshot(snpDir); e != 
nil {
-                                       snpError = multierr.Append(snpError, e)
+                       db.groups.Range(func(_, value any) bool {
+                               gs := value.(*groupShards)
+                               sLst := gs.shards.Load()
+                               if sLst == nil {
+                                       return true
                                }
-                       }
+                               for _, s := range *sLst {
+                                       snpDir := path.Join(snapshotDir, 
s.group, filepath.Base(s.location))
+                                       lfs.MkdirPanicIfExist(snpDir, 
storage.DirPerm)
+                                       if e := 
s.store.TakeFileSnapshot(snpDir); e != nil {
+                                               snpError = 
multierr.Append(snpError, e)
+                                       }
+                               }
+                               return true
+                       })
                        return snapshotDir, snpError
                })
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
@@ -441,11 +448,13 @@ func startEachNode(ctrl *gomock.Controller, node node, 
groups []group) *nodeCont
 
        result.appendStop(messenger.GracefulStop)
 
-       // initialize shard in to db
-       for i := int32(0); i < node.shardCount; i++ {
-               shardID := common.ShardID(i)
-               _, err = db.loadShard(context.Background(), shardID)
-               gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       // initialize shard in to db for each group
+       for _, g := range groups {
+               for i := int32(0); i < node.shardCount; i++ {
+                       shardID := common.ShardID(i)
+                       _, err = db.loadShard(context.Background(), g.name, 
shardID)
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+               }
        }
 
        // adding data to the node
@@ -477,7 +486,7 @@ func (w *repairGossipClientWrapper) Rev(ctx 
context.Context, t gossip.Trace, nex
 }
 
 func applyPropertyUpdate(db *database, p property) {
-       s, err := db.loadShard(context.Background(), common.ShardID(p.shard))
+       s, err := db.loadShard(context.Background(), p.group, 
common.ShardID(p.shard))
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        update := &propertyv1.Property{
                Metadata: &commonv1.Metadata{
@@ -496,7 +505,7 @@ func applyPropertyUpdate(db *database, p property) {
 }
 
 func queryPropertyWithVerify(db *database, p property) {
-       s, err := db.loadShard(context.Background(), common.ShardID(p.shard))
+       s, err := db.loadShard(context.Background(), p.group, 
common.ShardID(p.shard))
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
 
        query, err := inverted.BuildPropertyQuery(&propertyv1.QueryRequest{
diff --git a/banyand/property/repair_test.go b/banyand/property/repair_test.go
index 3926d31d..552f0af3 100644
--- a/banyand/property/repair_test.go
+++ b/banyand/property/repair_test.go
@@ -109,22 +109,6 @@ func TestBuildTree(t *testing.T) {
                                verifyContainsProperty(t, s, data, 
defaultGroupName, propertyBuilder{id: "test3"})
                        },
                },
-               {
-                       name: "build with multiple groups",
-                       existingDoc: func(s *shard) ([]index.Document, error) {
-                               return buildPropertyDocuments(s,
-                                       propertyBuilder{group: "group1", id: 
"test1"},
-                                       propertyBuilder{group: "group2", id: 
"test2"},
-                                       propertyBuilder{group: "group3", id: 
"test3"},
-                               )
-                       },
-                       statusVerify: func(t *testing.T, s *shard, data 
*repairData) {
-                               basicStatusVerify(t, data, "group1", 1, 
"group2", 1, "group3", 1)
-                               verifyContainsProperty(t, s, data, "group1", 
propertyBuilder{group: "group1", id: "test1"})
-                               verifyContainsProperty(t, s, data, "group2", 
propertyBuilder{group: "group2", id: "test2"})
-                               verifyContainsProperty(t, s, data, "group3", 
propertyBuilder{group: "group3", id: "test3"})
-                       },
-               },
                {
                        name: "build multiple times",
                        existingDoc: func(s *shard) ([]index.Document, error) {
@@ -211,7 +195,7 @@ func TestBuildTree(t *testing.T) {
                                _ = db.close()
                        })
 
-                       newShard, err := db.loadShard(context.Background(), 0)
+                       newShard, err := db.loadShard(context.Background(), 
defaultGroupName, 0)
                        if err != nil {
                                t.Fatal(err)
                        }
@@ -301,7 +285,7 @@ func TestDocumentUpdatesNotify(t *testing.T) {
                _ = db.close()
        })
 
-       newShard, err := db.loadShard(context.Background(), 0)
+       newShard, err := db.loadShard(context.Background(), defaultGroupName, 0)
        if err != nil {
                t.Fatal(err)
        }
@@ -314,7 +298,7 @@ func TestDocumentUpdatesNotify(t *testing.T) {
 
        // wait for the repair tree to be built
        gomega.Eventually(func() bool {
-               tree, _ := newShard.repairState.treeReader(defaultGroupName)
+               tree, _ := newShard.repairState.treeReader()
                if tree != nil {
                        _ = tree.close()
                }
@@ -501,16 +485,17 @@ func newRepairData(repair *repair, status *repairStatus) 
*repairData {
        }
 }
 
-func (r *repairData) readTree(t *testing.T, group string) *repairTestTree {
-       if tree, exist := r.cache[group]; exist {
+func (r *repairData) readTree(t *testing.T, _ string) *repairTestTree {
+       const cacheKey = "tree"
+       if tree, exist := r.cache[cacheKey]; exist {
                return tree
        }
        if r.readCacheOnly {
-               t.Fatalf("readTree called for group %s, but cache only mode is 
enabled", group)
+               t.Fatalf("readTree called, but cache only mode is enabled")
        }
-       reader, err := r.repair.treeReader(group)
+       reader, err := r.repair.treeReader()
        if err != nil {
-               t.Fatalf("failed to get tree reader for group %s: %v", group, 
err)
+               t.Fatalf("failed to get tree reader: %v", err)
        }
        if reader == nil {
                return nil
@@ -521,10 +506,10 @@ func (r *repairData) readTree(t *testing.T, group string) 
*repairTestTree {
 
        roots, err := reader.read(nil, 10, false)
        if err != nil {
-               t.Fatalf("failed to read tree for group %s: %v", group, err)
+               t.Fatalf("failed to read tree: %v", err)
        }
        if len(roots) == 0 {
-               t.Fatalf("expected at least one root for group %s, but got 
none", group)
+               t.Fatalf("expected at least one root, but got none")
        }
        tree := &repairTestTree{
                root: &repairTestTreeNode{
@@ -534,10 +519,10 @@ func (r *repairData) readTree(t *testing.T, group string) 
*repairTestTree {
        }
        slots, err := reader.read(roots[0], 10, false)
        if err != nil {
-               t.Fatalf("failed to read slots for group %s: %v", group, err)
+               t.Fatalf("failed to read slots: %v", err)
        }
        if len(slots) == 0 {
-               t.Fatalf("expected at least one slot for group %s, but got 
none", group)
+               t.Fatalf("expected at least one slot, but got none")
        }
        for _, slot := range slots {
                slotNode := &repairTestTreeNode{
@@ -545,12 +530,12 @@ func (r *repairData) readTree(t *testing.T, group string) 
*repairTestTree {
                        shaValue: slot.shaValue,
                }
                tree.root.children = append(tree.root.children, slotNode)
-               children, err := reader.read(slot, 10, false)
-               if err != nil {
-                       t.Fatalf("failed to read children for slot %d in group 
%s: %v", slot.slotInx, group, err)
+               children, readErr := reader.read(slot, 10, false)
+               if readErr != nil {
+                       t.Fatalf("failed to read children for slot %d: %v", 
slot.slotInx, readErr)
                }
                if len(children) == 0 {
-                       t.Fatalf("expected at least one child for slot %d in 
group %s, but got none", slot.slotInx, group)
+                       t.Fatalf("expected at least one child for slot %d, but 
got none", slot.slotInx)
                }
                for _, child := range children {
                        childNode := &repairTestTreeNode{
@@ -561,7 +546,7 @@ func (r *repairData) readTree(t *testing.T, group string) 
*repairTestTree {
                }
        }
 
-       r.cache[group] = tree
+       r.cache[cacheKey] = tree
        return tree
 }
 
diff --git a/banyand/property/shard.go b/banyand/property/shard.go
index 3568ffd8..93202940 100644
--- a/banyand/property/shard.go
+++ b/banyand/property/shard.go
@@ -70,12 +70,12 @@ var (
 )
 
 type shard struct {
-       store       index.SeriesStore
-       l           *logger.Logger
-       repairState *repair
-       location    string
-       id          common.ShardID
-
+       store             index.SeriesStore
+       l                 *logger.Logger
+       repairState       *repair
+       location          string
+       group             string
+       id                common.ShardID
        expireToDeleteSec int64
 }
 
@@ -88,21 +88,23 @@ func (s *shard) close() error {
 
 func (db *database) newShard(
        ctx context.Context,
+       group string,
        id common.ShardID,
        _ int64,
        deleteExpireSec int64,
        repairBaseDir string,
        repairTreeSlotCount int,
 ) (*shard, error) {
-       location := path.Join(db.location, fmt.Sprintf(shardTemplate, int(id)))
+       location := path.Join(db.location, group, fmt.Sprintf(shardTemplate, 
int(id)))
        sName := "shard" + strconv.Itoa(int(id))
        si := &shard{
                id:                id,
+               group:             group,
                l:                 logger.Fetch(ctx, sName),
                location:          location,
                expireToDeleteSec: deleteExpireSec,
        }
-       metricsFactory := 
db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"shard": sName}))
+       metricsFactory := 
db.omr.With(propertyScope.ConstLabels(meter.LabelPairs{"group": group, "shard": 
sName}))
        opts := inverted.StoreOpts{
                Path:                 location,
                Logger:               si.l,
@@ -114,7 +116,7 @@ func (db *database) newShard(
        if si.store, err = inverted.NewStore(opts); err != nil {
                return nil, err
        }
-       repairBaseDir = path.Join(repairBaseDir, sName)
+       repairBaseDir = path.Join(repairBaseDir, group, sName)
        si.repairState = newRepair(location, repairBaseDir, logger.Fetch(ctx, 
fmt.Sprintf("repair%d", id)),
                metricsFactory, repairBatchSearchSize, repairTreeSlotCount, 
db.repairScheduler)
        return si, nil
diff --git a/banyand/property/shard_test.go b/banyand/property/shard_test.go
index 1ec4f7ea..9aefb9ed 100644
--- a/banyand/property/shard_test.go
+++ b/banyand/property/shard_test.go
@@ -110,7 +110,7 @@ func TestMergeDeleted(t *testing.T) {
                                _ = db.close()
                        })
 
-                       newShard, err := db.loadShard(context.Background(), 0)
+                       newShard, err := db.loadShard(context.Background(), 
testPropertyGroup, 0)
                        if err != nil {
                                t.Fatal(err)
                        }
@@ -319,7 +319,7 @@ func TestRepair(t *testing.T) {
                                _ = db.close()
                        })
 
-                       newShard, err := db.loadShard(context.Background(), 0)
+                       newShard, err := db.loadShard(context.Background(), 
testPropertyGroup, 0)
                        if err != nil {
                                t.Fatal(err)
                        }
diff --git a/banyand/property/test_helper.go b/banyand/property/test_helper.go
index 677ab925..62ba0af1 100644
--- a/banyand/property/test_helper.go
+++ b/banyand/property/test_helper.go
@@ -51,7 +51,7 @@ func CreateTestShardForDump(tmpPath string, fileSystem 
fs.FileSystem) (string, f
        }
 
        // Load shard 0
-       shard, err := db.loadShard(context.Background(), 0)
+       shard, err := db.loadShard(context.Background(), "test-group", 0)
        if err != nil {
                db.close()
                panic(err)
diff --git a/bydbctl/internal/cmd/property_test.go 
b/bydbctl/internal/cmd/property_test.go
index 1459f1c0..ee383e22 100644
--- a/bydbctl/internal/cmd/property_test.go
+++ b/bydbctl/internal/cmd/property_test.go
@@ -1028,26 +1028,36 @@ func deleteData(rootCmd *cobra.Command, addr, group, 
name, id string, success bo
 }
 
 func generateInvertedStore(rootPath string) (index.SeriesStore, error) {
-       shardParent := path.Join(rootPath, "property", "data")
-       list, err := os.ReadDir(shardParent)
+       dataParent := path.Join(rootPath, "property", "data")
+       groupList, err := os.ReadDir(dataParent)
        if err != nil {
-               return nil, fmt.Errorf("read dir %s error: %w", shardParent, 
err)
+               return nil, fmt.Errorf("read dir %s error: %w", dataParent, err)
        }
-       if len(list) == 0 {
-               return nil, fmt.Errorf("no shard found in %s", shardParent)
+       if len(groupList) == 0 {
+               return nil, fmt.Errorf("no group found in %s", dataParent)
        }
-       for _, e := range list {
-               if !e.Type().IsDir() {
+       for _, groupEntry := range groupList {
+               if !groupEntry.Type().IsDir() {
                        continue
                }
-               _, found := strings.CutPrefix(e.Name(), "shard-")
-               if !found {
+               groupPath := path.Join(dataParent, groupEntry.Name())
+               shardList, readErr := os.ReadDir(groupPath)
+               if readErr != nil {
                        continue
                }
-               return inverted.NewStore(
-                       inverted.StoreOpts{
-                               Path: path.Join(shardParent, e.Name()),
-                       })
+               for _, shardEntry := range shardList {
+                       if !shardEntry.Type().IsDir() {
+                               continue
+                       }
+                       _, found := strings.CutPrefix(shardEntry.Name(), 
"shard-")
+                       if !found {
+                               continue
+                       }
+                       return inverted.NewStore(
+                               inverted.StoreOpts{
+                                       Path: path.Join(groupPath, 
shardEntry.Name()),
+                               })
+               }
        }
        return nil, fmt.Errorf("no shard found in %s", rootPath)
 }
@@ -1084,7 +1094,7 @@ func registerNodeToMessenger(m gossip.Messenger, nodeID, 
gossipRepairAddr string
 }
 
 func getRepairTreeFilePath(nodeDir, group string) string {
-       return path.Join(nodeDir, "property", "repairs", "shard0", 
fmt.Sprintf("state-tree-%s.data", group))
+       return path.Join(nodeDir, "property", "repairs", group, "shard0", 
"state-tree.data")
 }
 
 func getRepairTreeModTime(nodeDir, group string) (time.Time, error) {
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 814da7f7..b42eb5ab 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -1111,6 +1111,8 @@ TopNList contains a series of topN items
 | ----- | ---- | ----- | ----------- |
 | entity | [banyandb.model.v1.Tag](#banyandb-model-v1-Tag) | repeated |  |
 | value | [banyandb.model.v1.FieldValue](#banyandb-model-v1-FieldValue) |  |  |
+| version | [int64](#int64) |  |  |
+| timestamp | [google.protobuf.Timestamp](#google-protobuf-Timestamp) |  |  |
 
 
 


Reply via email to