hanahmily commented on code in PR #947:
URL: 
https://github.com/apache/skywalking-banyandb/pull/947#discussion_r2707460064


##########
banyand/internal/wqueue/wqueue.go:
##########
@@ -220,3 +220,12 @@ func (q *Queue[S, O]) GetTimeRange(ts time.Time) 
timestamp.TimeRange {
 func (q *Queue[S, O]) GetNodes(shardID common.ShardID) []string {
        return q.opts.GetNodes(shardID)
 }
+
+// Shards returns all shards in the queue.
+func (q *Queue[S, O]) Shards() []*Shard[S] {

Review Comment:
   Since we only ever access the queue from this shard, we can simplify the 
type and replace the shard with `AllSubQueue`. This removes unused fields and 
makes the dependency explicit.



##########
banyand/stream/metadata.go:
##########
@@ -231,17 +236,228 @@ func (sr *schemaRepo) loadStream(metadata 
*commonv1.Metadata) (*stream, bool) {
 }
 
 func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable, 
option], error) {
+       if sr == nil {
+               return nil, fmt.Errorf("schemaRepo is nil")
+       }
        g, ok := sr.LoadGroup(groupName)
        if !ok {
                return nil, fmt.Errorf("group %s not found", groupName)
        }
        db := g.SupplyTSDB()
        if db == nil {
-               return nil, fmt.Errorf("tsdb for group %s not found", groupName)
+               return nil, fmt.Errorf("group %s not found", groupName)
        }
        return db.(storage.TSDB[*tsTable, option]), nil
 }
 
+// CollectDataInfo collects data info for a specific.
+func (sr *schemaRepo) CollectDataInfo(ctx context.Context, group string) 
(*databasev1.DataInfo, error) {
+       if sr.nodeID == "" {
+               return nil, fmt.Errorf("node ID is empty")
+       }
+       node, nodeErr := sr.metadata.NodeRegistry().GetNode(ctx, sr.nodeID)
+       if nodeErr != nil {
+               return nil, fmt.Errorf("failed to get current node info: %w", 
nodeErr)
+       }
+       tsdb, tsdbErr := sr.loadTSDB(group)
+       if tsdbErr != nil {
+               return nil, tsdbErr
+       }
+       if tsdb == nil {
+               return nil, nil
+       }
+       segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+               Start: time.Unix(0, 0),
+               End:   time.Unix(0, timestamp.MaxNanoTime),
+       })
+       if segmentsErr != nil {
+               return nil, segmentsErr
+       }
+       var segmentInfoList []*databasev1.SegmentInfo
+       var totalDataSize int64
+       for _, segment := range segments {
+               timeRange := segment.GetTimeRange()
+               tables, _ := segment.Tables()
+               var shardInfoList []*databasev1.ShardInfo
+               for shardIdx, table := range tables {
+                       shardInfo := sr.collectShardInfo(table, 
uint32(shardIdx))
+                       shardInfoList = append(shardInfoList, shardInfo)
+                       totalDataSize += shardInfo.DataSizeBytes
+               }
+               seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
+               totalDataSize += seriesIndexInfo.DataSizeBytes
+               segmentInfo := &databasev1.SegmentInfo{
+                       SegmentId:       fmt.Sprintf("%d-%d", 
timeRange.Start.UnixNano(), timeRange.End.UnixNano()),
+                       TimeRangeStart:  
timeRange.Start.Format(time.RFC3339Nano),
+                       TimeRangeEnd:    timeRange.End.Format(time.RFC3339Nano),
+                       ShardInfo:       shardInfoList,
+                       SeriesIndexInfo: seriesIndexInfo,
+               }
+               segmentInfoList = append(segmentInfoList, segmentInfo)
+       }
+       dataInfo := &databasev1.DataInfo{
+               Node:          node,
+               SegmentInfo:   segmentInfoList,
+               DataSizeBytes: totalDataSize,
+       }
+       return dataInfo, nil
+}
+
+func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable, 
option]) *databasev1.SeriesIndexInfo {
+       indexDB := segment.IndexDB()
+       if indexDB == nil {
+               return &databasev1.SeriesIndexInfo{
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+               }
+       }
+       dataCount, dataSizeBytes := indexDB.Stats()
+       return &databasev1.SeriesIndexInfo{
+               DataCount:     dataCount,
+               DataSizeBytes: dataSizeBytes,
+       }
+}
+
+func (sr *schemaRepo) collectShardInfo(table any, shardID uint32) 
*databasev1.ShardInfo {
+       tst, ok := table.(*tsTable)
+       if !ok {
+               return &databasev1.ShardInfo{
+                       ShardId:       shardID,
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+                       PartCount:     0,
+               }
+       }
+       tst.RLock()
+       defer tst.RUnlock()
+       snapshot := tst.snapshot
+       if snapshot == nil {
+               return &databasev1.ShardInfo{
+                       ShardId:       shardID,
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+                       PartCount:     0,
+               }
+       }
+       var totalCount, compressedSize, uncompressedSize, partCount uint64
+       for _, pw := range snapshot.parts {
+               if pw.p != nil {
+                       totalCount += pw.p.partMetadata.TotalCount
+                       compressedSize += pw.p.partMetadata.CompressedSizeBytes
+                       uncompressedSize += 
pw.p.partMetadata.UncompressedSizeBytes
+                       partCount++
+               } else if pw.mp != nil {
+                       totalCount += pw.mp.partMetadata.TotalCount
+                       compressedSize += pw.mp.partMetadata.CompressedSizeBytes
+                       uncompressedSize += 
pw.mp.partMetadata.UncompressedSizeBytes
+                       partCount++
+               }
+       }
+       invertedIndexInfo := sr.collectInvertedIndexInfo(tst)
+       return &databasev1.ShardInfo{
+               ShardId:           shardID,
+               DataCount:         int64(totalCount),
+               DataSizeBytes:     int64(compressedSize),
+               PartCount:         int64(partCount),
+               InvertedIndexInfo: invertedIndexInfo,
+               SidxInfo:          &databasev1.SIDXInfo{},
+       }
+}
+
+func (sr *schemaRepo) collectInvertedIndexInfo(tst *tsTable) 
*databasev1.InvertedIndexInfo {
+       if tst.index == nil {
+               return &databasev1.InvertedIndexInfo{
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+               }
+       }
+       dataCount, dataSizeBytes := tst.index.store.Stats()
+       return &databasev1.InvertedIndexInfo{
+               DataCount:     dataCount,
+               DataSizeBytes: dataSizeBytes,
+       }
+}
+
+func (sr *schemaRepo) collectPendingWriteInfo(groupName string) (int64, error) 
{
+       if sr == nil || sr.Repository == nil {
+               return 0, fmt.Errorf("schema repository is not initialized")
+       }
+       if sr.role == databasev1.Role_ROLE_LIAISON {
+               queue, queueErr := sr.loadQueue(groupName)
+               if queueErr != nil {
+                       return 0, fmt.Errorf("failed to load queue: %w", 
queueErr)
+               }
+               if queue == nil {
+                       return 0, nil
+               }
+               var pendingWriteCount int64
+               for _, shard := range queue.Shards() {
+                       tst := shard.SubQueue()
+                       if tst != nil {
+                               pendingWriteCount += tst.pendingDataCount.Load()
+                       }
+               }
+               return pendingWriteCount, nil
+       }
+       // Standalone mode
+       tsdb, tsdbErr := sr.loadTSDB(groupName)
+       if tsdbErr != nil {
+               return 0, fmt.Errorf("failed to load TSDB: %w", tsdbErr)
+       }
+       if tsdb == nil {
+               return 0, fmt.Errorf("TSDB is nil for group %s", groupName)
+       }
+       segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{

Review Comment:
   Invoke segment.DecRef() to release them.



##########
banyand/stream/tstable.go:
##########
@@ -49,21 +49,22 @@ const (
 )
 
 type tsTable struct {
-       fileSystem    fs.FileSystem
-       pm            protector.Memory
-       metrics       *metrics
-       index         *elementIndex
-       snapshot      *snapshot
-       loopCloser    *run.Closer
-       getNodes      func() []string
-       l             *logger.Logger
-       introductions chan *introduction
-       p             common.Position
-       group         string
-       root          string
-       gc            garbageCleaner
-       option        option
-       curPartID     uint64
+       fileSystem       fs.FileSystem
+       pm               protector.Memory
+       metrics          *metrics
+       index            *elementIndex
+       snapshot         *snapshot
+       loopCloser       *run.Closer
+       getNodes         func() []string
+       l                *logger.Logger
+       introductions    chan *introduction
+       p                common.Position
+       group            string
+       root             string
+       gc               garbageCleaner
+       option           option
+       curPartID        uint64
+       pendingDataCount atomic.Int64

Review Comment:
   Would you please move pendingDataCount to metrics and make it as a metric?



##########
banyand/metadata/schema/etcd.go:
##########
@@ -128,14 +132,20 @@ func CheckInterval(d time.Duration) WatcherOption {
        }
 }
 
+var _ Registry = (*etcdSchemaRegistry)(nil)
+
 type etcdSchemaRegistry struct {

Review Comment:
   `etcd` will become optional soon. Please do not add any new functionality to 
it. Instead, create a new struct to implement these features.



##########
banyand/stream/metadata.go:
##########
@@ -231,17 +236,228 @@ func (sr *schemaRepo) loadStream(metadata 
*commonv1.Metadata) (*stream, bool) {
 }
 
 func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable, 
option], error) {
+       if sr == nil {
+               return nil, fmt.Errorf("schemaRepo is nil")
+       }
        g, ok := sr.LoadGroup(groupName)
        if !ok {
                return nil, fmt.Errorf("group %s not found", groupName)
        }
        db := g.SupplyTSDB()
        if db == nil {
-               return nil, fmt.Errorf("tsdb for group %s not found", groupName)
+               return nil, fmt.Errorf("group %s not found", groupName)
        }
        return db.(storage.TSDB[*tsTable, option]), nil
 }
 
+// CollectDataInfo collects data info for a specific.
+func (sr *schemaRepo) CollectDataInfo(ctx context.Context, group string) 
(*databasev1.DataInfo, error) {
+       if sr.nodeID == "" {
+               return nil, fmt.Errorf("node ID is empty")
+       }
+       node, nodeErr := sr.metadata.NodeRegistry().GetNode(ctx, sr.nodeID)
+       if nodeErr != nil {
+               return nil, fmt.Errorf("failed to get current node info: %w", 
nodeErr)
+       }
+       tsdb, tsdbErr := sr.loadTSDB(group)
+       if tsdbErr != nil {
+               return nil, tsdbErr
+       }
+       if tsdb == nil {
+               return nil, nil
+       }
+       segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+               Start: time.Unix(0, 0),
+               End:   time.Unix(0, timestamp.MaxNanoTime),
+       })
+       if segmentsErr != nil {
+               return nil, segmentsErr
+       }
+       var segmentInfoList []*databasev1.SegmentInfo
+       var totalDataSize int64
+       for _, segment := range segments {
+               timeRange := segment.GetTimeRange()
+               tables, _ := segment.Tables()
+               var shardInfoList []*databasev1.ShardInfo
+               for shardIdx, table := range tables {
+                       shardInfo := sr.collectShardInfo(table, 
uint32(shardIdx))
+                       shardInfoList = append(shardInfoList, shardInfo)
+                       totalDataSize += shardInfo.DataSizeBytes
+               }
+               seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
+               totalDataSize += seriesIndexInfo.DataSizeBytes
+               segmentInfo := &databasev1.SegmentInfo{
+                       SegmentId:       fmt.Sprintf("%d-%d", 
timeRange.Start.UnixNano(), timeRange.End.UnixNano()),
+                       TimeRangeStart:  
timeRange.Start.Format(time.RFC3339Nano),
+                       TimeRangeEnd:    timeRange.End.Format(time.RFC3339Nano),
+                       ShardInfo:       shardInfoList,
+                       SeriesIndexInfo: seriesIndexInfo,
+               }
+               segmentInfoList = append(segmentInfoList, segmentInfo)
+       }
+       dataInfo := &databasev1.DataInfo{
+               Node:          node,
+               SegmentInfo:   segmentInfoList,
+               DataSizeBytes: totalDataSize,
+       }
+       return dataInfo, nil
+}
+
+func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable, 
option]) *databasev1.SeriesIndexInfo {
+       indexDB := segment.IndexDB()
+       if indexDB == nil {
+               return &databasev1.SeriesIndexInfo{
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+               }
+       }
+       dataCount, dataSizeBytes := indexDB.Stats()
+       return &databasev1.SeriesIndexInfo{
+               DataCount:     dataCount,
+               DataSizeBytes: dataSizeBytes,
+       }
+}
+
+func (sr *schemaRepo) collectShardInfo(table any, shardID uint32) 
*databasev1.ShardInfo {
+       tst, ok := table.(*tsTable)
+       if !ok {
+               return &databasev1.ShardInfo{
+                       ShardId:       shardID,
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+                       PartCount:     0,
+               }
+       }
+       tst.RLock()
+       defer tst.RUnlock()
+       snapshot := tst.snapshot
+       if snapshot == nil {
+               return &databasev1.ShardInfo{
+                       ShardId:       shardID,
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+                       PartCount:     0,
+               }
+       }
+       var totalCount, compressedSize, uncompressedSize, partCount uint64
+       for _, pw := range snapshot.parts {
+               if pw.p != nil {
+                       totalCount += pw.p.partMetadata.TotalCount
+                       compressedSize += pw.p.partMetadata.CompressedSizeBytes
+                       uncompressedSize += 
pw.p.partMetadata.UncompressedSizeBytes
+                       partCount++
+               } else if pw.mp != nil {
+                       totalCount += pw.mp.partMetadata.TotalCount
+                       compressedSize += pw.mp.partMetadata.CompressedSizeBytes
+                       uncompressedSize += 
pw.mp.partMetadata.UncompressedSizeBytes
+                       partCount++
+               }
+       }
+       invertedIndexInfo := sr.collectInvertedIndexInfo(tst)
+       return &databasev1.ShardInfo{
+               ShardId:           shardID,
+               DataCount:         int64(totalCount),
+               DataSizeBytes:     int64(compressedSize),
+               PartCount:         int64(partCount),
+               InvertedIndexInfo: invertedIndexInfo,
+               SidxInfo:          &databasev1.SIDXInfo{},
+       }
+}
+
+func (sr *schemaRepo) collectInvertedIndexInfo(tst *tsTable) 
*databasev1.InvertedIndexInfo {
+       if tst.index == nil {
+               return &databasev1.InvertedIndexInfo{
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+               }
+       }
+       dataCount, dataSizeBytes := tst.index.store.Stats()
+       return &databasev1.InvertedIndexInfo{
+               DataCount:     dataCount,
+               DataSizeBytes: dataSizeBytes,
+       }
+}
+
+func (sr *schemaRepo) collectPendingWriteInfo(groupName string) (int64, error) 
{
+       if sr == nil || sr.Repository == nil {
+               return 0, fmt.Errorf("schema repository is not initialized")
+       }
+       if sr.role == databasev1.Role_ROLE_LIAISON {
+               queue, queueErr := sr.loadQueue(groupName)
+               if queueErr != nil {
+                       return 0, fmt.Errorf("failed to load queue: %w", 
queueErr)
+               }
+               if queue == nil {
+                       return 0, nil
+               }
+               var pendingWriteCount int64
+               for _, shard := range queue.Shards() {
+                       tst := shard.SubQueue()
+                       if tst != nil {
+                               pendingWriteCount += tst.pendingDataCount.Load()
+                       }
+               }
+               return pendingWriteCount, nil
+       }
+       // Standalone mode
+       tsdb, tsdbErr := sr.loadTSDB(groupName)
+       if tsdbErr != nil {
+               return 0, fmt.Errorf("failed to load TSDB: %w", tsdbErr)
+       }
+       if tsdb == nil {
+               return 0, fmt.Errorf("TSDB is nil for group %s", groupName)
+       }
+       segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+               Start: time.Unix(0, 0),
+               End:   time.Unix(0, timestamp.MaxNanoTime),
+       })
+       if segmentsErr != nil {
+               return 0, fmt.Errorf("failed to select segments: %w", 
segmentsErr)
+       }
+       var pendingWriteCount int64
+       for _, segment := range segments {
+               tables, _ := segment.Tables()
+               for _, tst := range tables {
+                       pendingWriteCount += tst.pendingDataCount.Load()
+               }
+       }
+       return pendingWriteCount, nil
+}
+
+func (sr *schemaRepo) collectPendingSyncInfo(groupName string) (partCount 
int64, totalSizeBytes int64, err error) {
+       if sr == nil || sr.Repository == nil {
+               return 0, 0, fmt.Errorf("schema repository is not initialized")
+       }
+       // Only liaison nodes collect pending sync info
+       if sr.role != databasev1.Role_ROLE_LIAISON {
+               return 0, 0, nil
+       }
+       queue, queueErr := sr.loadQueue(groupName)
+       if queueErr != nil {
+               return 0, 0, fmt.Errorf("failed to load queue: %w", queueErr)
+       }
+       if queue == nil {
+               return 0, 0, nil
+       }
+       for _, shard := range queue.Shards() {
+               tst := shard.SubQueue()
+               if tst != nil {
+                       tst.RLock()
+                       snapshot := tst.snapshot

Review Comment:
   Use currentSnapshot to avoid accessing the lock. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to