This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new e8453c7d Implement group inspection to display deletion details (#947)
e8453c7d is described below

commit e8453c7d1f65730422f73e7390a7d3ed7e29b5e4
Author: Huang Youliang <[email protected]>
AuthorDate: Sun Feb 1 16:20:19 2026 +0800

    Implement group inspection to display deletion details (#947)
    
    * Implement group inspection to display deletion details
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 api/data/data.go                                   |  91 ++-
 api/data/measure.go                                |   6 +
 api/data/stream.go                                 |   6 +
 api/data/trace.go                                  |   6 +
 banyand/internal/storage/index.go                  |   4 +
 banyand/internal/storage/storage.go                |   1 +
 banyand/internal/wqueue/wqueue.go                  |  11 +
 banyand/liaison/grpc/registry.go                   | 113 ++++
 banyand/measure/introducer.go                      |   1 +
 banyand/measure/metadata.go                        | 184 +++++++
 banyand/measure/metrics.go                         |  19 +-
 banyand/measure/svc_data.go                        |  36 +-
 banyand/measure/svc_liaison.go                     |  49 ++
 banyand/measure/svc_standalone.go                  |  27 +
 banyand/measure/tstable.go                         |  19 +-
 banyand/metadata/client.go                         |  36 ++
 banyand/metadata/metadata.go                       |   7 +
 banyand/metadata/schema/collector.go               | 228 ++++++++
 banyand/metadata/schema/etcd.go                    |   5 +-
 banyand/metadata/schema/schema.go                  |  10 +
 banyand/stream/introducer.go                       |   1 +
 banyand/stream/metadata.go                         | 218 +++++++-
 banyand/stream/metrics.go                          |  23 +-
 banyand/stream/svc_liaison.go                      |  54 ++
 banyand/stream/svc_standalone.go                   |  45 +-
 banyand/stream/tstable.go                          |  33 +-
 banyand/trace/handoff_controller.go                |  15 +
 banyand/trace/introducer.go                        |   1 +
 banyand/trace/merger.go                            |   1 +
 banyand/trace/metadata.go                          | 234 +++++++-
 banyand/trace/metrics.go                           |  20 +-
 banyand/trace/svc_liaison.go                       |  58 ++
 banyand/trace/svc_standalone.go                    |  41 +-
 banyand/trace/trace.go                             |   2 +
 banyand/trace/tstable.go                           |  35 +-
 pkg/cmdsetup/liaison.go                            |   3 +
 pkg/index/index.go                                 |   1 +
 pkg/index/inverted/inverted.go                     |  15 +
 .../distributed/inspect/inspect_suite_test.go      | 613 +++++++++++++++++++++
 .../standalone/inspect/inspect_suite_test.go       | 558 +++++++++++++++++++
 40 files changed, 2754 insertions(+), 76 deletions(-)

diff --git a/api/data/data.go b/api/data/data.go
index 61c97897..9857bac4 100644
--- a/api/data/data.go
+++ b/api/data/data.go
@@ -21,6 +21,7 @@ package data
 import (
        "google.golang.org/protobuf/proto"
 
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
@@ -31,30 +32,36 @@ import (
 var (
        // TopicMap is the map of topic name to topic.
        TopicMap = map[string]bus.Topic{
-               TopicStreamWrite.String():              TopicStreamWrite,
-               TopicStreamQuery.String():              TopicStreamQuery,
-               TopicMeasureWrite.String():             TopicMeasureWrite,
-               TopicMeasureQuery.String():             TopicMeasureQuery,
-               TopicInternalMeasureQuery.String():     
TopicInternalMeasureQuery,
-               TopicTopNQuery.String():                TopicTopNQuery,
-               TopicPropertyDelete.String():           TopicPropertyDelete,
-               TopicPropertyQuery.String():            TopicPropertyQuery,
-               TopicPropertyUpdate.String():           TopicPropertyUpdate,
-               TopicStreamPartSync.String():           TopicStreamPartSync,
-               TopicMeasurePartSync.String():          TopicMeasurePartSync,
-               TopicMeasureSeriesIndexInsert.String(): 
TopicMeasureSeriesIndexInsert,
-               TopicMeasureSeriesIndexUpdate.String(): 
TopicMeasureSeriesIndexUpdate,
-               TopicMeasureSeriesSync.String():        TopicMeasureSeriesSync,
-               TopicPropertyRepair.String():           TopicPropertyRepair,
-               TopicStreamSeriesIndexWrite.String():   
TopicStreamSeriesIndexWrite,
-               TopicStreamLocalIndexWrite.String():    
TopicStreamLocalIndexWrite,
-               TopicStreamSeriesSync.String():         TopicStreamSeriesSync,
-               TopicStreamElementIndexSync.String():   
TopicStreamElementIndexSync,
-               TopicTraceWrite.String():               TopicTraceWrite,
-               TopicTraceQuery.String():               TopicTraceQuery,
-               TopicTracePartSync.String():            TopicTracePartSync,
-               TopicTraceSeriesSync.String():          TopicTraceSeriesSync,
-               TopicTraceSidxSeriesWrite.String():     
TopicTraceSidxSeriesWrite,
+               TopicStreamWrite.String():               TopicStreamWrite,
+               TopicStreamQuery.String():               TopicStreamQuery,
+               TopicMeasureWrite.String():              TopicMeasureWrite,
+               TopicMeasureQuery.String():              TopicMeasureQuery,
+               TopicInternalMeasureQuery.String():      
TopicInternalMeasureQuery,
+               TopicTopNQuery.String():                 TopicTopNQuery,
+               TopicPropertyDelete.String():            TopicPropertyDelete,
+               TopicPropertyQuery.String():             TopicPropertyQuery,
+               TopicPropertyUpdate.String():            TopicPropertyUpdate,
+               TopicStreamPartSync.String():            TopicStreamPartSync,
+               TopicMeasurePartSync.String():           TopicMeasurePartSync,
+               TopicMeasureSeriesIndexInsert.String():  
TopicMeasureSeriesIndexInsert,
+               TopicMeasureSeriesIndexUpdate.String():  
TopicMeasureSeriesIndexUpdate,
+               TopicMeasureSeriesSync.String():         TopicMeasureSeriesSync,
+               TopicPropertyRepair.String():            TopicPropertyRepair,
+               TopicStreamSeriesIndexWrite.String():    
TopicStreamSeriesIndexWrite,
+               TopicStreamLocalIndexWrite.String():     
TopicStreamLocalIndexWrite,
+               TopicStreamSeriesSync.String():          TopicStreamSeriesSync,
+               TopicStreamElementIndexSync.String():    
TopicStreamElementIndexSync,
+               TopicTraceWrite.String():                TopicTraceWrite,
+               TopicTraceQuery.String():                TopicTraceQuery,
+               TopicTracePartSync.String():             TopicTracePartSync,
+               TopicTraceSeriesSync.String():           TopicTraceSeriesSync,
+               TopicTraceSidxSeriesWrite.String():      
TopicTraceSidxSeriesWrite,
+               TopicMeasureCollectDataInfo.String():    
TopicMeasureCollectDataInfo,
+               TopicMeasureCollectLiaisonInfo.String(): 
TopicMeasureCollectLiaisonInfo,
+               TopicStreamCollectDataInfo.String():     
TopicStreamCollectDataInfo,
+               TopicStreamCollectLiaisonInfo.String():  
TopicStreamCollectLiaisonInfo,
+               TopicTraceCollectDataInfo.String():      
TopicTraceCollectDataInfo,
+               TopicTraceCollectLiaisonInfo.String():   
TopicTraceCollectLiaisonInfo,
        }
 
        // TopicRequestMap is the map of topic name to request message.
@@ -132,6 +139,24 @@ var (
                TopicTraceSidxSeriesWrite: func() proto.Message {
                        return nil
                },
+               TopicMeasureCollectDataInfo: func() proto.Message {
+                       return &databasev1.GroupRegistryServiceInspectRequest{}
+               },
+               TopicMeasureCollectLiaisonInfo: func() proto.Message {
+                       return &databasev1.GroupRegistryServiceInspectRequest{}
+               },
+               TopicStreamCollectDataInfo: func() proto.Message {
+                       return &databasev1.GroupRegistryServiceInspectRequest{}
+               },
+               TopicStreamCollectLiaisonInfo: func() proto.Message {
+                       return &databasev1.GroupRegistryServiceInspectRequest{}
+               },
+               TopicTraceCollectDataInfo: func() proto.Message {
+                       return &databasev1.GroupRegistryServiceInspectRequest{}
+               },
+               TopicTraceCollectLiaisonInfo: func() proto.Message {
+                       return &databasev1.GroupRegistryServiceInspectRequest{}
+               },
        }
 
        // TopicResponseMap is the map of topic name to response message.
@@ -164,6 +189,24 @@ var (
                TopicTraceQuery: func() proto.Message {
                        return &tracev1.InternalQueryResponse{}
                },
+               TopicMeasureCollectDataInfo: func() proto.Message {
+                       return &databasev1.DataInfo{}
+               },
+               TopicStreamCollectDataInfo: func() proto.Message {
+                       return &databasev1.DataInfo{}
+               },
+               TopicTraceCollectDataInfo: func() proto.Message {
+                       return &databasev1.DataInfo{}
+               },
+               TopicMeasureCollectLiaisonInfo: func() proto.Message {
+                       return &databasev1.LiaisonInfo{}
+               },
+               TopicStreamCollectLiaisonInfo: func() proto.Message {
+                       return &databasev1.LiaisonInfo{}
+               },
+               TopicTraceCollectLiaisonInfo: func() proto.Message {
+                       return &databasev1.LiaisonInfo{}
+               },
        }
 
        // TopicCommon is the common topic for data transmission.
diff --git a/api/data/measure.go b/api/data/measure.go
index 2bf2bc4d..d414abeb 100644
--- a/api/data/measure.go
+++ b/api/data/measure.go
@@ -103,3 +103,9 @@ var MeasureSeriesSyncKindVersion = common.KindVersion{
 
 // TopicMeasureSeriesSync is the measure series sync topic.
 var TopicMeasureSeriesSync = bus.BiTopic(MeasureSeriesSyncKindVersion.String())
+
+// TopicMeasureCollectDataInfo is the topic for collecting data info from data 
nodes.
+var TopicMeasureCollectDataInfo = bus.BiTopic("measure-collect-data-info")
+
+// TopicMeasureCollectLiaisonInfo is the topic for collecting liaison info 
from liaison nodes.
+var TopicMeasureCollectLiaisonInfo = 
bus.BiTopic("measure-collect-liaison-info")
diff --git a/api/data/stream.go b/api/data/stream.go
index 43be0dc0..560931a3 100644
--- a/api/data/stream.go
+++ b/api/data/stream.go
@@ -93,3 +93,9 @@ var StreamElementIndexSyncKindVersion = common.KindVersion{
 
 // TopicStreamElementIndexSync is the element index sync topic.
 var TopicStreamElementIndexSync = 
bus.BiTopic(StreamElementIndexSyncKindVersion.String())
+
+// TopicStreamCollectDataInfo is the topic for collecting data info from data 
nodes.
+var TopicStreamCollectDataInfo = bus.BiTopic("stream-collect-data-info")
+
+// TopicStreamCollectLiaisonInfo is the topic for collecting liaison info from 
liaison nodes.
+var TopicStreamCollectLiaisonInfo = bus.BiTopic("stream-collect-liaison-info")
diff --git a/api/data/trace.go b/api/data/trace.go
index 3918e3f7..0c7d5579 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -75,3 +75,9 @@ var TraceSidxSeriesWriteKindVersion = common.KindVersion{
 
 // TopicTraceSidxSeriesWrite is the trace sidx series write topic.
 var TopicTraceSidxSeriesWrite = 
bus.BiTopic(TraceSidxSeriesWriteKindVersion.String())
+
+// TopicTraceCollectDataInfo is the topic for collecting data info from data 
nodes.
+var TopicTraceCollectDataInfo = bus.BiTopic("trace-collect-data-info")
+
+// TopicTraceCollectLiaisonInfo is the topic for collecting liaison info from 
liaison nodes.
+var TopicTraceCollectLiaisonInfo = bus.BiTopic("trace-collect-liaison-info")
diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index bc033dc7..ad8afc1c 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -94,6 +94,10 @@ func (s *seriesIndex) EnableExternalSegments() 
(index.ExternalSegmentStreamer, e
        return s.store.EnableExternalSegments()
 }
 
+func (s *seriesIndex) Stats() (dataCount int64, dataSizeBytes int64) {
+       return s.store.Stats()
+}
+
 func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series,
        projection []index.FieldKey, secondaryQuery index.Query, timeRange 
*timestamp.TimeRange,
 ) (data SeriesData, err error) {
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index 5ac1b883..41f7280a 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -102,6 +102,7 @@ type IndexDB interface {
        Search(ctx context.Context, series []*pbv1.Series, opts 
IndexSearchOpts) (SeriesData, [][]byte, error)
        SearchWithoutSeries(ctx context.Context, opts IndexSearchOpts) (sd 
SeriesData, sortedValues [][]byte, err error)
        EnableExternalSegments() (index.ExternalSegmentStreamer, error)
+       Stats() (dataCount int64, dataSizeBytes int64)
 }
 
 // TSDB allows listing and getting shard details.
diff --git a/banyand/internal/wqueue/wqueue.go 
b/banyand/internal/wqueue/wqueue.go
index 099664ae..357d5a95 100644
--- a/banyand/internal/wqueue/wqueue.go
+++ b/banyand/internal/wqueue/wqueue.go
@@ -220,3 +220,14 @@ func (q *Queue[S, O]) GetTimeRange(ts time.Time) 
timestamp.TimeRange {
 func (q *Queue[S, O]) GetNodes(shardID common.ShardID) []string {
        return q.opts.GetNodes(shardID)
 }
+
+// SubQueues returns sub-queues from all shards in the queue.
+func (q *Queue[S, O]) SubQueues() []S {
+       q.RLock()
+       defer q.RUnlock()
+       result := make([]S, 0, len(q.sLst))
+       for _, shard := range q.sLst {
+               result = append(result, shard.sq)
+       }
+       return result
+}
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 671b6948..8c0a2dc4 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -700,6 +700,119 @@ func (rs *groupRegistryServer) Exist(ctx context.Context, 
req *databasev1.GroupR
        return nil, err
 }
 
+func (rs *groupRegistryServer) Inspect(ctx context.Context, req 
*databasev1.GroupRegistryServiceInspectRequest) (
+       *databasev1.GroupRegistryServiceInspectResponse, error,
+) {
+       g := req.GetGroup()
+       rs.metrics.totalRegistryStarted.Inc(1, g, "group", "inspect")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "group", "inspect")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", 
"inspect")
+       }()
+       group, err := rs.schemaRegistry.GroupRegistry().GetGroup(ctx, g)
+       if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
+               return nil, err
+       }
+       schemaInfo, schemaErr := rs.collectSchemaInfo(ctx, g)
+       if schemaErr != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
+               return nil, schemaErr
+       }
+       dataInfo, dataErr := rs.schemaRegistry.CollectDataInfo(ctx, g)
+       if dataErr != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
+               return nil, dataErr
+       }
+       liaisonInfo, liaisonErr := rs.schemaRegistry.CollectLiaisonInfo(ctx, g)
+       if liaisonErr != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
+               return nil, liaisonErr
+       }
+       return &databasev1.GroupRegistryServiceInspectResponse{
+               Group:       group,
+               SchemaInfo:  schemaInfo,
+               DataInfo:    dataInfo,
+               LiaisonInfo: liaisonInfo,
+       }, nil
+}
+
+func (rs *groupRegistryServer) Query(_ context.Context, _ 
*databasev1.GroupRegistryServiceQueryRequest) (
+       *databasev1.GroupRegistryServiceQueryResponse, error,
+) {
+       return nil, status.Error(codes.Unimplemented, "Query method not 
implemented yet")
+}
+
+func (rs *groupRegistryServer) collectSchemaInfo(ctx context.Context, group 
string) (*databasev1.SchemaInfo, error) {
+       opt := schema.ListOpt{Group: group}
+       streams, streamsErr := 
rs.schemaRegistry.StreamRegistry().ListStream(ctx, opt)
+       if streamsErr != nil {
+               return nil, streamsErr
+       }
+       streamNames := make([]string, 0, len(streams))
+       for _, s := range streams {
+               streamNames = append(streamNames, s.GetMetadata().GetName())
+       }
+       measures, measuresErr := 
rs.schemaRegistry.MeasureRegistry().ListMeasure(ctx, opt)
+       if measuresErr != nil {
+               return nil, measuresErr
+       }
+       measureNames := make([]string, 0, len(measures))
+       for _, m := range measures {
+               measureNames = append(measureNames, m.GetMetadata().GetName())
+       }
+       traces, tracesErr := rs.schemaRegistry.TraceRegistry().ListTrace(ctx, 
opt)
+       if tracesErr != nil {
+               return nil, tracesErr
+       }
+       traceNames := make([]string, 0, len(traces))
+       for _, t := range traces {
+               traceNames = append(traceNames, t.GetMetadata().GetName())
+       }
+       properties, propertiesErr := 
rs.schemaRegistry.PropertyRegistry().ListProperty(ctx, opt)
+       if propertiesErr != nil {
+               return nil, propertiesErr
+       }
+       propertyNames := make([]string, 0, len(properties))
+       for _, p := range properties {
+               propertyNames = append(propertyNames, p.GetMetadata().GetName())
+       }
+       indexRules, indexRulesErr := 
rs.schemaRegistry.IndexRuleRegistry().ListIndexRule(ctx, opt)
+       if indexRulesErr != nil {
+               return nil, indexRulesErr
+       }
+       indexRuleNames := make([]string, 0, len(indexRules))
+       for _, ir := range indexRules {
+               indexRuleNames = append(indexRuleNames, 
ir.GetMetadata().GetName())
+       }
+       indexRuleBindings, indexRuleBindingsErr := 
rs.schemaRegistry.IndexRuleBindingRegistry().ListIndexRuleBinding(ctx, opt)
+       if indexRuleBindingsErr != nil {
+               return nil, indexRuleBindingsErr
+       }
+       bindingNames := make([]string, 0, len(indexRuleBindings))
+       for _, irb := range indexRuleBindings {
+               bindingNames = append(bindingNames, irb.GetMetadata().GetName())
+       }
+       topNAggs, topNAggsErr := 
rs.schemaRegistry.TopNAggregationRegistry().ListTopNAggregation(ctx, opt)
+       if topNAggsErr != nil {
+               return nil, topNAggsErr
+       }
+       topNNames := make([]string, 0, len(topNAggs))
+       for _, tn := range topNAggs {
+               topNNames = append(topNNames, tn.GetMetadata().GetName())
+       }
+       return &databasev1.SchemaInfo{
+               Streams:           streamNames,
+               Measures:          measureNames,
+               Traces:            traceNames,
+               Properties:        propertyNames,
+               IndexRules:        indexRuleNames,
+               IndexRuleBindings: bindingNames,
+               TopnAggregations:  topNNames,
+       }, nil
+}
+
 type topNAggregationRegistryServer struct {
        databasev1.UnimplementedTopNAggregationRegistryServiceServer
        schemaRegistry metadata.Repo
diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go
index e60d071a..8aed38c3 100644
--- a/banyand/measure/introducer.go
+++ b/banyand/measure/introducer.go
@@ -192,6 +192,7 @@ func (tst *tsTable) introduceMemPart(nextIntroduction 
*introduction, epoch uint6
        }
 
        next := nextIntroduction.memPart
+       tst.addPendingDataCount(-int64(next.mp.partMetadata.TotalCount))
        nextSnp := cur.copyAllTo(epoch)
        nextSnp.parts = append(nextSnp.parts, next)
        nextSnp.creator = snapshotCreatorMemPart
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index b9da0e7a..3482b096 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -85,6 +85,7 @@ type schemaRepo struct {
        nodeID           string
        path             string
        closingGroupsMu  sync.RWMutex
+       role             databasev1.Role
 }
 
 func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string, 
nodeID string) *schemaRepo {
@@ -95,6 +96,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels 
map[string]string, n
                pipeline:      svc.localPipeline,
                nodeID:        nodeID,
                closingGroups: make(map[string]struct{}),
+               role:          databasev1.Role_ROLE_DATA,
        }
        sr.Repository = resourceSchema.NewRepository(
                svc.metadata,
@@ -113,6 +115,7 @@ func newLiaisonSchemaRepo(path string, svc *liaison, 
measureDataNodeRegistry grp
                metadata:      svc.metadata,
                pipeline:      pipeline,
                closingGroups: make(map[string]struct{}),
+               role:          databasev1.Role_ROLE_LIAISON,
        }
        sr.Repository = resourceSchema.NewRepository(
                svc.metadata,
@@ -370,6 +373,186 @@ func (sr *schemaRepo) loadQueue(groupName string) 
(*wqueue.Queue[*tsTable, optio
        return db.(*wqueue.Queue[*tsTable, option]), nil
 }
 
+// CollectDataInfo collects data info for a specific group.
+func (sr *schemaRepo) CollectDataInfo(ctx context.Context, group string) 
(*databasev1.DataInfo, error) {
+       if sr.nodeID == "" {
+               return nil, nil
+       }
+       node, nodeErr := sr.metadata.NodeRegistry().GetNode(ctx, sr.nodeID)
+       if nodeErr != nil {
+               return nil, nodeErr
+       }
+       tsdb, tsdbErr := sr.loadTSDB(group)
+       if tsdbErr != nil {
+               return nil, tsdbErr
+       }
+       if tsdb == nil {
+               return nil, nil
+       }
+       segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+               Start: time.Unix(0, 0),
+               End:   time.Unix(0, timestamp.MaxNanoTime),
+       })
+       if segmentsErr != nil {
+               return nil, segmentsErr
+       }
+       var segmentInfoList []*databasev1.SegmentInfo
+       var totalDataSize int64
+       for _, segment := range segments {
+               timeRange := segment.GetTimeRange()
+               tables, _ := segment.Tables()
+               var shardInfoList []*databasev1.ShardInfo
+               for shardIdx, table := range tables {
+                       shardInfo := sr.collectShardInfo(table, 
uint32(shardIdx))
+                       shardInfoList = append(shardInfoList, shardInfo)
+                       totalDataSize += shardInfo.DataSizeBytes
+               }
+               seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
+               totalDataSize += seriesIndexInfo.DataSizeBytes
+               segmentInfo := &databasev1.SegmentInfo{
+                       SegmentId:       fmt.Sprintf("%d-%d", 
timeRange.Start.UnixNano(), timeRange.End.UnixNano()),
+                       TimeRangeStart:  
timeRange.Start.Format(time.RFC3339Nano),
+                       TimeRangeEnd:    timeRange.End.Format(time.RFC3339Nano),
+                       ShardInfo:       shardInfoList,
+                       SeriesIndexInfo: seriesIndexInfo,
+               }
+               segmentInfoList = append(segmentInfoList, segmentInfo)
+               segment.DecRef()
+       }
+       dataInfo := &databasev1.DataInfo{
+               Node:          node,
+               SegmentInfo:   segmentInfoList,
+               DataSizeBytes: totalDataSize,
+       }
+       return dataInfo, nil
+}
+
+func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable, 
option]) *databasev1.SeriesIndexInfo {
+       indexDB := segment.IndexDB()
+       if indexDB == nil {
+               return &databasev1.SeriesIndexInfo{}
+       }
+       dataCount, dataSizeBytes := indexDB.Stats()
+       return &databasev1.SeriesIndexInfo{
+               DataCount:     dataCount,
+               DataSizeBytes: dataSizeBytes,
+       }
+}
+
+func (sr *schemaRepo) collectShardInfo(table any, shardID uint32) 
*databasev1.ShardInfo {
+       tst, ok := table.(*tsTable)
+       if !ok {
+               return &databasev1.ShardInfo{
+                       ShardId: shardID,
+               }
+       }
+       snapshot := tst.currentSnapshot()
+       if snapshot == nil {
+               return &databasev1.ShardInfo{
+                       ShardId: shardID,
+               }
+       }
+       defer snapshot.decRef()
+       var totalCount, compressedSize, partCount uint64
+       for _, pw := range snapshot.parts {
+               if pw.p != nil {
+                       totalCount += pw.p.partMetadata.TotalCount
+                       compressedSize += pw.p.partMetadata.CompressedSizeBytes
+                       partCount++
+               } else if pw.mp != nil {
+                       totalCount += pw.mp.partMetadata.TotalCount
+                       compressedSize += pw.mp.partMetadata.CompressedSizeBytes
+                       partCount++
+               }
+       }
+       return &databasev1.ShardInfo{
+               ShardId:           shardID,
+               DataCount:         int64(totalCount),
+               DataSizeBytes:     int64(compressedSize),
+               PartCount:         int64(partCount),
+               InvertedIndexInfo: &databasev1.InvertedIndexInfo{},
+               SidxInfo:          &databasev1.SIDXInfo{},
+       }
+}
+
+func (sr *schemaRepo) collectPendingWriteInfo(groupName string) (int64, error) 
{
+       if sr == nil || sr.Repository == nil {
+               return 0, fmt.Errorf("schema repository is not initialized")
+       }
+       if sr.role == databasev1.Role_ROLE_LIAISON {
+               queue, queueErr := sr.loadQueue(groupName)
+               if queueErr != nil {
+                       return 0, fmt.Errorf("failed to load queue: %w", 
queueErr)
+               }
+               if queue == nil {
+                       return 0, nil
+               }
+               var pendingWriteCount int64
+               for _, sq := range queue.SubQueues() {
+                       if sq != nil {
+                               pendingWriteCount += sq.getPendingDataCount()
+                       }
+               }
+               return pendingWriteCount, nil
+       }
+       // Standalone mode
+       tsdb, tsdbErr := sr.loadTSDB(groupName)
+       if tsdbErr != nil {
+               return 0, fmt.Errorf("failed to load TSDB: %w", tsdbErr)
+       }
+       if tsdb == nil {
+               return 0, fmt.Errorf("TSDB is nil for group %s", groupName)
+       }
+       segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+               Start: time.Unix(0, 0),
+               End:   time.Unix(0, timestamp.MaxNanoTime),
+       })
+       if segmentsErr != nil {
+               return 0, fmt.Errorf("failed to select segments: %w", 
segmentsErr)
+       }
+       var pendingWriteCount int64
+       for _, segment := range segments {
+               tables, _ := segment.Tables()
+               for _, tst := range tables {
+                       pendingWriteCount += tst.getPendingDataCount()
+               }
+               segment.DecRef()
+       }
+       return pendingWriteCount, nil
+}
+
+func (sr *schemaRepo) collectPendingSyncInfo(groupName string) (partCount 
int64, totalSizeBytes int64, err error) {
+       if sr == nil || sr.Repository == nil {
+               return 0, 0, fmt.Errorf("schema repository is not initialized")
+       }
+       // Only liaison nodes collect pending sync info
+       if sr.role != databasev1.Role_ROLE_LIAISON {
+               return 0, 0, nil
+       }
+       queue, queueErr := sr.loadQueue(groupName)
+       if queueErr != nil {
+               return 0, 0, fmt.Errorf("failed to load queue: %w", queueErr)
+       }
+       if queue == nil {
+               return 0, 0, nil
+       }
+       for _, sq := range queue.SubQueues() {
+               if sq != nil {
+                       snapshot := sq.currentSnapshot()
+                       if snapshot != nil {
+                               for _, pw := range snapshot.parts {
+                                       if pw.mp == nil && pw.p != nil && 
pw.p.partMetadata.TotalCount > 0 {
+                                               partCount++
+                                               totalSizeBytes += 
int64(pw.p.partMetadata.CompressedSizeBytes)
+                                       }
+                               }
+                               snapshot.decRef()
+                       }
+               }
+       }
+       return partCount, totalSizeBytes, nil
+}
+
 func (sr *schemaRepo) createTopNResultMeasure(ctx context.Context, 
measureSchemaRegistry schema.Measure, group string) {
        md := GetTopNSchemaMetadata(group)
        operation := func() error {
@@ -675,6 +858,7 @@ func (s *queueSupplier) newMetrics(p common.Position) 
storage.Metrics {
                        totalFileBlocks:                
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
                        totalFilePartBytes:             
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
                        totalFilePartUncompressedBytes: 
factory.NewGauge("total_file_part_uncompressed_bytes", 
common.ShardLabelNames()...),
+                       pendingDataCount:               
factory.NewGauge("pending_data_count", common.ShardLabelNames()...),
                },
        }
 }
diff --git a/banyand/measure/metrics.go b/banyand/measure/metrics.go
index 8cf4e57e..243522a9 100644
--- a/banyand/measure/metrics.go
+++ b/banyand/measure/metrics.go
@@ -34,7 +34,6 @@ import (
 var measureScope = observability.RootScope.SubScope("measure")
 
 type metrics struct {
-       tbMetrics
        totalWritten           meter.Counter
        totalBatch             meter.Counter
        totalBatchIntroLatency meter.Counter
@@ -67,6 +66,8 @@ type metrics struct {
        totalMergedParts  meter.Counter
        totalMergeLatency meter.Counter
        totalMerged       meter.Counter
+
+       tbMetrics
 }
 
 func (tst *tsTable) incTotalWritten(delta int) {
@@ -251,6 +252,18 @@ func (tst *tsTable) incTotalMerged(delta int, typ string) {
        tst.metrics.totalMerged.Inc(float64(delta), typ)
 }
 
+func (tst *tsTable) addPendingDataCount(delta int64) {
+       tst.pendingDataCount.Add(delta)
+       if tst.metrics == nil {
+               return
+       }
+       tst.metrics.tbMetrics.pendingDataCount.Add(float64(delta), 
tst.p.ShardLabelValues()...)
+}
+
+func (tst *tsTable) getPendingDataCount() int64 {
+       return tst.pendingDataCount.Load()
+}
+
 func (m *metrics) DeleteAll() {
        if m == nil {
                return
@@ -335,6 +348,7 @@ func (s *supplier) newMetrics(p common.Position) 
(storage.Metrics, observability
                        totalFileBlocks:                
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
                        totalFilePartBytes:             
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
                        totalFilePartUncompressedBytes: 
factory.NewGauge("total_file_part_uncompressed_bytes", 
common.ShardLabelNames()...),
+                       pendingDataCount:               
factory.NewGauge("pending_data_count", common.ShardLabelNames()...),
                },
        }, factory
 }
@@ -393,6 +407,7 @@ func (tst *tsTable) deleteMetrics() {
        
tst.metrics.tbMetrics.totalFileBlocks.Delete(tst.p.ShardLabelValues()...)
        
tst.metrics.tbMetrics.totalFilePartBytes.Delete(tst.p.ShardLabelValues()...)
        
tst.metrics.tbMetrics.totalFilePartUncompressedBytes.Delete(tst.p.ShardLabelValues()...)
+       
tst.metrics.tbMetrics.pendingDataCount.Delete(tst.p.ShardLabelValues()...)
 }
 
 type tbMetrics struct {
@@ -407,6 +422,8 @@ type tbMetrics struct {
        totalFileBlocks                meter.Gauge
        totalFilePartBytes             meter.Gauge
        totalFilePartUncompressedBytes meter.Gauge
+
+       pendingDataCount meter.Gauge
 }
 
 func (s *standalone) createNativeObservabilityGroup(ctx context.Context) error 
{
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index fee99941..a447240e 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -252,7 +252,7 @@ func (s *dataSVC) PreRun(ctx context.Context) error {
                s.c = storage.NewServiceCacheWithConfig(s.cc)
        }
        node := val.(common.Node)
-       s.schemaRepo = newDataSchemaRepo(s.dataPath, s, node.Labels)
+       s.schemaRepo = newDataSchemaRepo(s.dataPath, s, node.Labels, 
node.NodeID)
 
        s.cm = newCacheMetrics(s.omr)
        obsservice.MetricsCollector.Register("measure_cache", 
s.collectCacheMetrics)
@@ -261,6 +261,11 @@ func (s *dataSVC) PreRun(ctx context.Context) error {
                return nil
        }
 
+       collectDataInfoListener := &collectDataInfoListener{s: s}
+       if subscribeErr := 
s.pipeline.Subscribe(data.TopicMeasureCollectDataInfo, 
collectDataInfoListener); subscribeErr != nil {
+               return fmt.Errorf("failed to subscribe to collect data info 
topic: %w", subscribeErr)
+       }
+
        if err := s.createDataNativeObservabilityGroup(ctx); err != nil {
                return err
        }
@@ -405,11 +410,13 @@ func NewReadonlyDataSVC(metadata metadata.Repo, omr 
observability.MetricsRegistr
        }, nil
 }
 
-func newDataSchemaRepo(path string, svc *dataSVC, nodeLabels 
map[string]string) *schemaRepo {
+func newDataSchemaRepo(path string, svc *dataSVC, nodeLabels 
map[string]string, nodeID string) *schemaRepo {
        sr := &schemaRepo{
                path:     path,
                l:        svc.l,
                metadata: svc.metadata,
+               nodeID:   nodeID,
+               role:     databasev1.Role_ROLE_DATA,
        }
        sr.Repository = resourceSchema.NewRepository(
                svc.metadata,
@@ -533,3 +540,28 @@ func (d *dataDeleteStreamSegmentsListener) Rev(_ 
context.Context, message bus.Me
        deleted := db.DeleteExpiredSegments(req.SegmentSuffixes)
        return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), deleted)
 }
+
+func (s *dataSVC) CollectDataInfo(ctx context.Context, group string) 
(*databasev1.DataInfo, error) {
+       return s.schemaRepo.CollectDataInfo(ctx, group)
+}
+
+func (s *dataSVC) CollectLiaisonInfo(_ context.Context, _ string) 
(*databasev1.LiaisonInfo, error) {
+       return nil, errors.New("collect liaison info is not supported on data 
node")
+}
+
+type collectDataInfoListener struct {
+       *bus.UnImplementedHealthyListener
+       s *dataSVC
+}
+
+func (l *collectDataInfoListener) Rev(ctx context.Context, message 
bus.Message) bus.Message {
+       req, ok := 
message.Data().(*databasev1.GroupRegistryServiceInspectRequest)
+       if !ok {
+               return bus.NewMessage(message.ID(), common.NewError("invalid 
data type for collect data info request"))
+       }
+       dataInfo, collectErr := l.s.schemaRepo.CollectDataInfo(ctx, req.Group)
+       if collectErr != nil {
+               return bus.NewMessage(message.ID(), common.NewError("failed to 
collect data info: %v", collectErr))
+       }
+       return bus.NewMessage(message.ID(), dataInfo)
+}
diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go
index 453e087e..aa6dbea8 100644
--- a/banyand/measure/svc_liaison.go
+++ b/banyand/measure/svc_liaison.go
@@ -19,6 +19,7 @@ package measure
 
 import (
        "context"
+       "fmt"
        "path"
        "path/filepath"
        "strings"
@@ -36,6 +37,7 @@ import (
        obsservice 
"github.com/apache/skywalking-banyandb/banyand/observability/services"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/node"
@@ -76,6 +78,27 @@ func (s *liaison) GetRemovalSegmentsTimeRange(group string) 
*timestamp.TimeRange
        return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
 }
 
+func (s *liaison) CollectDataInfo(_ context.Context, _ string) 
(*databasev1.DataInfo, error) {
+       return nil, errors.New("collect data info is not supported on liaison 
node")
+}
+
+// CollectLiaisonInfo collects liaison node statistics.
+func (s *liaison) CollectLiaisonInfo(_ context.Context, group string) 
(*databasev1.LiaisonInfo, error) {
+       info := &databasev1.LiaisonInfo{}
+       pendingWriteCount, writeErr := 
s.schemaRepo.collectPendingWriteInfo(group)
+       if writeErr != nil {
+               return nil, fmt.Errorf("failed to collect pending write info: 
%w", writeErr)
+       }
+       info.PendingWriteDataCount = pendingWriteCount
+       pendingSyncPartCount, pendingSyncDataSizeBytes, syncErr := 
s.schemaRepo.collectPendingSyncInfo(group)
+       if syncErr != nil {
+               return nil, fmt.Errorf("failed to collect pending sync info: 
%w", syncErr)
+       }
+       info.PendingSyncPartCount = pendingSyncPartCount
+       info.PendingSyncDataSizeBytes = pendingSyncDataSizeBytes
+       return info, nil
+}
+
 func (s *liaison) FlagSet() *run.FlagSet {
        flagS := run.NewFlagSet("storage")
        flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of 
measure")
@@ -153,6 +176,15 @@ func (s *liaison) PreRun(ctx context.Context) error {
        if err := s.pipeline.Subscribe(data.TopicMeasureWrite, writeListener); 
err != nil {
                return err
        }
+       if metaSvc, ok := s.metadata.(metadata.Service); ok {
+               
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_MEASURE, s)
+       }
+
+       collectLiaisonInfoListener := &collectLiaisonInfoListener{s: s}
+       if subscribeErr := 
s.pipeline.Subscribe(data.TopicMeasureCollectLiaisonInfo, 
collectLiaisonInfoListener); subscribeErr != nil {
+               return fmt.Errorf("failed to subscribe to collect liaison info 
topic: %w", subscribeErr)
+       }
+
        return topNResultPipeline.Subscribe(data.TopicMeasureWrite, 
writeListener)
 }
 
@@ -179,3 +211,20 @@ func NewLiaison(metadata metadata.Repo, pipeline 
queue.Server, omr observability
                },
        }, nil
 }
+
+type collectLiaisonInfoListener struct {
+       *bus.UnImplementedHealthyListener
+       s *liaison
+}
+
+func (l *collectLiaisonInfoListener) Rev(ctx context.Context, message 
bus.Message) bus.Message {
+       req, ok := 
message.Data().(*databasev1.GroupRegistryServiceInspectRequest)
+       if !ok {
+               return bus.NewMessage(message.ID(), common.NewError("invalid 
data type for collect liaison info request"))
+       }
+       liaisonInfo, collectErr := l.s.CollectLiaisonInfo(ctx, req.Group)
+       if collectErr != nil {
+               return bus.NewMessage(message.ID(), common.NewError("failed to 
collect liaison info: %v", collectErr))
+       }
+       return bus.NewMessage(message.ID(), liaisonInfo)
+}
diff --git a/banyand/measure/svc_standalone.go 
b/banyand/measure/svc_standalone.go
index 1c444380..06f6a8a6 100644
--- a/banyand/measure/svc_standalone.go
+++ b/banyand/measure/svc_standalone.go
@@ -19,6 +19,7 @@ package measure
 
 import (
        "context"
+       "fmt"
        "path"
        "path/filepath"
        "strings"
@@ -55,6 +56,8 @@ type Service interface {
        run.Config
        run.Service
        Query
+       CollectDataInfo(context.Context, string) (*databasev1.DataInfo, error)
+       CollectLiaisonInfo(context.Context, string) (*databasev1.LiaisonInfo, 
error)
 }
 
 var _ Service = (*standalone)(nil)
@@ -258,6 +261,10 @@ func (s *standalone) PreRun(ctx context.Context) error {
        }
        node := val.(common.Node)
        s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels, node.NodeID)
+       if metaSvc, ok := s.metadata.(metadata.Service); ok {
+               metaSvc.RegisterDataCollector(commonv1.Catalog_CATALOG_MEASURE, 
s.schemaRepo)
+               
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_MEASURE, s)
+       }
 
        s.cm = newCacheMetrics(s.omr)
        obsservice.MetricsCollector.Register("measure_cache", 
s.collectCacheMetrics)
@@ -346,3 +353,23 @@ func NewStandalone(metadata metadata.Repo, pipeline 
queue.Server, metricPipeline
                pm:             pm,
        }, nil
 }
+
+func (s *standalone) CollectDataInfo(ctx context.Context, group string) 
(*databasev1.DataInfo, error) {
+       return s.schemaRepo.CollectDataInfo(ctx, group)
+}
+
+func (s *standalone) CollectLiaisonInfo(_ context.Context, group string) 
(*databasev1.LiaisonInfo, error) {
+       info := &databasev1.LiaisonInfo{}
+       pendingWriteCount, writeErr := 
s.schemaRepo.collectPendingWriteInfo(group)
+       if writeErr != nil {
+               return nil, fmt.Errorf("failed to collect pending write info: 
%w", writeErr)
+       }
+       info.PendingWriteDataCount = pendingWriteCount
+       pendingSyncPartCount, pendingSyncDataSizeBytes, syncErr := 
s.schemaRepo.collectPendingSyncInfo(group)
+       if syncErr != nil {
+               return nil, fmt.Errorf("failed to collect pending sync info: 
%w", syncErr)
+       }
+       info.PendingSyncPartCount = pendingSyncPartCount
+       info.PendingSyncDataSizeBytes = pendingSyncDataSizeBytes
+       return info, nil
+}
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 683327e7..d213cc98 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -152,14 +152,15 @@ type tsTable struct {
        introductions chan *introduction
        snapshot      *snapshot
        *metrics
-       getNodes  func() []string
-       l         *logger.Logger
-       p         common.Position
-       root      string
-       group     string
-       gc        garbageCleaner
-       option    option
-       curPartID uint64
+       getNodes         func() []string
+       l                *logger.Logger
+       p                common.Position
+       root             string
+       group            string
+       gc               garbageCleaner
+       option           option
+       curPartID        uint64
+       pendingDataCount atomic.Int64
        sync.RWMutex
        shardID common.ShardID
 }
@@ -316,9 +317,11 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
        ind.memPart.p.partMetadata.ID = atomic.AddUint64(&tst.curPartID, 1)
        startTime := time.Now()
        totalCount := mp.partMetadata.TotalCount
+       tst.addPendingDataCount(int64(totalCount))
        select {
        case tst.introductions <- ind:
        case <-tst.loopCloser.CloseNotify():
+               tst.addPendingDataCount(-int64(totalCount))
                return
        }
        select {
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index ae5a9d30..54666d67 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -37,6 +37,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/discovery/file"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -78,10 +79,13 @@ func NewClient(toRegisterNode, forceRegisterNode bool) 
(Service, error) {
 
 type clientService struct {
        schemaRegistry           schema.Registry
+       infoCollectorRegistry    *schema.InfoCollectorRegistry
        dnsDiscovery             *dns.Service
        fileDiscovery            *file.Service
        closer                   *run.Closer
        nodeInfo                 *databasev1.Node
+       dataBroadcaster          bus.Broadcaster
+       liaisonBroadcaster       bus.Broadcaster
        etcdTLSCertFile          string
        dnsCACertPaths           []string
        etcdPassword             string
@@ -243,6 +247,14 @@ func (s *clientService) PreRun(ctx context.Context) error {
                return err
        }
 
+       s.infoCollectorRegistry = schema.NewInfoCollectorRegistry(l, 
s.schemaRegistry)
+       if s.dataBroadcaster != nil {
+               s.infoCollectorRegistry.SetDataBroadcaster(s.dataBroadcaster)
+       }
+       if s.liaisonBroadcaster != nil {
+               
s.infoCollectorRegistry.SetLiaisonBroadcaster(s.liaisonBroadcaster)
+       }
+
        if s.nodeDiscoveryMode == NodeDiscoveryModeDNS {
                l.Info().Strs("srv-addresses", 
s.dnsSRVAddresses).Msg("Initializing DNS-based node discovery")
 
@@ -543,6 +555,30 @@ func (s *clientService) Subjects(ctx context.Context, 
indexRule *databasev1.Inde
        return foundSubjects, subjectErr
 }
 
+func (s *clientService) CollectDataInfo(ctx context.Context, group string) 
([]*databasev1.DataInfo, error) {
+       return s.infoCollectorRegistry.CollectDataInfo(ctx, group)
+}
+
+func (s *clientService) CollectLiaisonInfo(ctx context.Context, group string) 
([]*databasev1.LiaisonInfo, error) {
+       return s.infoCollectorRegistry.CollectLiaisonInfo(ctx, group)
+}
+
+func (s *clientService) RegisterDataCollector(catalog commonv1.Catalog, 
collector schema.DataInfoCollector) {
+       s.infoCollectorRegistry.RegisterDataCollector(catalog, collector)
+}
+
+func (s *clientService) RegisterLiaisonCollector(catalog commonv1.Catalog, 
collector schema.LiaisonInfoCollector) {
+       s.infoCollectorRegistry.RegisterLiaisonCollector(catalog, collector)
+}
+
+func (s *clientService) SetDataBroadcaster(broadcaster bus.Broadcaster) {
+       s.dataBroadcaster = broadcaster
+}
+
+func (s *clientService) SetLiaisonBroadcaster(broadcaster bus.Broadcaster) {
+       s.liaisonBroadcaster = broadcaster
+}
+
 func contains(s []string, e string) bool {
        for _, a := range s {
                if a == e {
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 085cdfed..2e21ffbc 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -26,6 +26,7 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -52,6 +53,8 @@ type Repo interface {
        RegisterHandler(string, schema.Kind, schema.EventHandler)
        NodeRegistry() schema.Node
        PropertyRegistry() schema.Property
+       CollectDataInfo(context.Context, string) ([]*databasev1.DataInfo, error)
+       CollectLiaisonInfo(context.Context, string) ([]*databasev1.LiaisonInfo, 
error)
 }
 
 // Service is the metadata repository.
@@ -62,4 +65,8 @@ type Service interface {
        run.Config
        SchemaRegistry() schema.Registry
        SetMetricsRegistry(omr observability.MetricsRegistry)
+       SetDataBroadcaster(broadcaster bus.Broadcaster)
+       SetLiaisonBroadcaster(broadcaster bus.Broadcaster)
+       RegisterDataCollector(catalog commonv1.Catalog, collector 
schema.DataInfoCollector)
+       RegisterLiaisonCollector(catalog commonv1.Catalog, collector 
schema.LiaisonInfoCollector)
 }
diff --git a/banyand/metadata/schema/collector.go 
b/banyand/metadata/schema/collector.go
new file mode 100644
index 00000000..6f348d91
--- /dev/null
+++ b/banyand/metadata/schema/collector.go
@@ -0,0 +1,228 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// GroupGetter provides method to get group metadata.
+type GroupGetter interface {
+       GetGroup(ctx context.Context, group string) (*commonv1.Group, error)
+}
+
+// InfoCollectorRegistry manages data and liaison info collectors.
+type InfoCollectorRegistry struct {
+       groupGetter        GroupGetter
+       dataCollectors     map[commonv1.Catalog]DataInfoCollector
+       liaisonCollectors  map[commonv1.Catalog]LiaisonInfoCollector
+       dataBroadcaster    bus.Broadcaster
+       liaisonBroadcaster bus.Broadcaster
+       l                  *logger.Logger
+       mux                sync.RWMutex
+}
+
+// NewInfoCollectorRegistry creates a new InfoCollectorRegistry.
+func NewInfoCollectorRegistry(l *logger.Logger, groupGetter GroupGetter) 
*InfoCollectorRegistry {
+       return &InfoCollectorRegistry{
+               groupGetter:       groupGetter,
+               dataCollectors:    make(map[commonv1.Catalog]DataInfoCollector),
+               liaisonCollectors: 
make(map[commonv1.Catalog]LiaisonInfoCollector),
+               l:                 l,
+       }
+}
+
+// CollectDataInfo collects data information from both local and remote data 
nodes.
+func (icr *InfoCollectorRegistry) CollectDataInfo(ctx context.Context, group 
string) ([]*databasev1.DataInfo, error) {
+       g, getErr := icr.groupGetter.GetGroup(ctx, group)
+       if getErr != nil {
+               return nil, getErr
+       }
+       localInfo, localErr := icr.collectDataInfoLocal(ctx, g.Catalog, group)
+       if localErr != nil {
+               return nil, localErr
+       }
+       localInfoList := []*databasev1.DataInfo{}
+       if localInfo != nil {
+               localInfoList = []*databasev1.DataInfo{localInfo}
+       }
+       if icr.dataBroadcaster == nil {
+               return localInfoList, nil
+       }
+
+       var topic bus.Topic
+       switch g.Catalog {
+       case commonv1.Catalog_CATALOG_MEASURE:
+               topic = data.TopicMeasureCollectDataInfo
+       case commonv1.Catalog_CATALOG_STREAM:
+               topic = data.TopicStreamCollectDataInfo
+       case commonv1.Catalog_CATALOG_TRACE:
+               topic = data.TopicTraceCollectDataInfo
+       default:
+               return nil, fmt.Errorf("unsupported catalog type: %v", 
g.Catalog)
+       }
+       remoteInfo := icr.broadcastCollectDataInfo(topic, group)
+       return append(localInfoList, remoteInfo...), nil
+}
+
+func (icr *InfoCollectorRegistry) broadcastCollectDataInfo(topic bus.Topic, 
group string) []*databasev1.DataInfo {
+       message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
&databasev1.GroupRegistryServiceInspectRequest{Group: group})
+       futures, broadcastErr := icr.dataBroadcaster.Broadcast(5*time.Second, 
topic, message)
+       if broadcastErr != nil {
+               icr.l.Warn().Err(broadcastErr).Str("group", group).Msg("failed 
to broadcast collect data info request")
+               return []*databasev1.DataInfo{}
+       }
+
+       dataInfoList := make([]*databasev1.DataInfo, 0, len(futures))
+       for _, future := range futures {
+               msg, getErr := future.Get()
+               if getErr != nil {
+                       icr.l.Warn().Err(getErr).Str("group", 
group).Msg("failed to get collect data info response")
+                       continue
+               }
+               msgData := msg.Data()
+               switch d := msgData.(type) {
+               case *databasev1.DataInfo:
+                       if d != nil {
+                               dataInfoList = append(dataInfoList, d)
+                       }
+               case *common.Error:
+                       icr.l.Warn().Str("error", d.Error()).Str("group", 
group).Msg("error collecting data info from node")
+               }
+       }
+       return dataInfoList
+}
+
+func (icr *InfoCollectorRegistry) collectDataInfoLocal(ctx context.Context, 
catalog commonv1.Catalog, group string) (*databasev1.DataInfo, error) {
+       icr.mux.RLock()
+       collector, hasCollector := icr.dataCollectors[catalog]
+       icr.mux.RUnlock()
+       if hasCollector && collector != nil {
+               return collector.CollectDataInfo(ctx, group)
+       }
+       return nil, nil
+}
+
+// CollectLiaisonInfo collects liaison information from both local and remote 
liaison nodes.
+func (icr *InfoCollectorRegistry) CollectLiaisonInfo(ctx context.Context, 
group string) ([]*databasev1.LiaisonInfo, error) {
+       g, getErr := icr.groupGetter.GetGroup(ctx, group)
+       if getErr != nil {
+               return nil, getErr
+       }
+       localInfo, localErr := icr.collectLiaisonInfoLocal(ctx, g.Catalog, 
group)
+       if localErr != nil {
+               return nil, localErr
+       }
+       localInfoList := []*databasev1.LiaisonInfo{}
+       if localInfo != nil {
+               localInfoList = []*databasev1.LiaisonInfo{localInfo}
+       }
+       if icr.liaisonBroadcaster == nil {
+               return localInfoList, nil
+       }
+
+       var topic bus.Topic
+       switch g.Catalog {
+       case commonv1.Catalog_CATALOG_MEASURE:
+               topic = data.TopicMeasureCollectLiaisonInfo
+       case commonv1.Catalog_CATALOG_STREAM:
+               topic = data.TopicStreamCollectLiaisonInfo
+       case commonv1.Catalog_CATALOG_TRACE:
+               topic = data.TopicTraceCollectLiaisonInfo
+       default:
+               return nil, fmt.Errorf("unsupported catalog type: %v", 
g.Catalog)
+       }
+       remoteInfo := icr.broadcastCollectLiaisonInfo(topic, group)
+       return append(localInfoList, remoteInfo...), nil
+}
+
+func (icr *InfoCollectorRegistry) broadcastCollectLiaisonInfo(topic bus.Topic, 
group string) []*databasev1.LiaisonInfo {
+       message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
&databasev1.GroupRegistryServiceInspectRequest{Group: group})
+       futures, broadcastErr := 
icr.liaisonBroadcaster.Broadcast(5*time.Second, topic, message)
+       if broadcastErr != nil {
+               icr.l.Warn().Err(broadcastErr).Str("group", group).Msg("failed 
to broadcast collect liaison info request")
+               return []*databasev1.LiaisonInfo{}
+       }
+
+       liaisonInfoList := make([]*databasev1.LiaisonInfo, 0, len(futures))
+       for _, future := range futures {
+               msg, getErr := future.Get()
+               if getErr != nil {
+                       icr.l.Warn().Err(getErr).Str("group", 
group).Msg("failed to get collect liaison info response")
+                       continue
+               }
+               msgData := msg.Data()
+               switch d := msgData.(type) {
+               case *databasev1.LiaisonInfo:
+                       if d != nil {
+                               liaisonInfoList = append(liaisonInfoList, d)
+                       }
+               case *common.Error:
+                       icr.l.Warn().Str("error", d.Error()).Str("group", 
group).Msg("error collecting liaison info from node")
+               }
+       }
+       return liaisonInfoList
+}
+
+func (icr *InfoCollectorRegistry) collectLiaisonInfoLocal(ctx context.Context, 
catalog commonv1.Catalog, group string) (*databasev1.LiaisonInfo, error) {
+       icr.mux.RLock()
+       collector, hasCollector := icr.liaisonCollectors[catalog]
+       icr.mux.RUnlock()
+       if hasCollector && collector != nil {
+               return collector.CollectLiaisonInfo(ctx, group)
+       }
+       return nil, nil
+}
+
+// RegisterDataCollector registers a data info collector for a specific 
catalog.
+func (icr *InfoCollectorRegistry) RegisterDataCollector(catalog 
commonv1.Catalog, collector DataInfoCollector) {
+       icr.mux.Lock()
+       defer icr.mux.Unlock()
+       icr.dataCollectors[catalog] = collector
+}
+
+// RegisterLiaisonCollector registers a liaison info collector for a specific 
catalog.
+func (icr *InfoCollectorRegistry) RegisterLiaisonCollector(catalog 
commonv1.Catalog, collector LiaisonInfoCollector) {
+       icr.mux.Lock()
+       defer icr.mux.Unlock()
+       icr.liaisonCollectors[catalog] = collector
+}
+
+// SetDataBroadcaster sets the broadcaster for data info collection.
+func (icr *InfoCollectorRegistry) SetDataBroadcaster(broadcaster 
bus.Broadcaster) {
+       icr.mux.Lock()
+       defer icr.mux.Unlock()
+       icr.dataBroadcaster = broadcaster
+}
+
+// SetLiaisonBroadcaster sets the broadcaster for liaison info collection.
+func (icr *InfoCollectorRegistry) SetLiaisonBroadcaster(broadcaster 
bus.Broadcaster) {
+       icr.mux.Lock()
+       defer icr.mux.Unlock()
+       icr.liaisonBroadcaster = broadcaster
+}
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index d5283e53..7be2b159 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -128,6 +128,8 @@ func CheckInterval(d time.Duration) WatcherOption {
        }
 }
 
+var _ Registry = (*etcdSchemaRegistry)(nil)
+
 type etcdSchemaRegistry struct {
        client        *clientv3.Client
        closer        *run.Closer
@@ -256,11 +258,12 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) 
(Registry, error) {
        if err != nil {
                return nil, err
        }
+       schemaLogger := logger.GetLogger("schema-registry")
        reg := &etcdSchemaRegistry{
                namespace:     registryConfig.namespace,
                client:        client,
                closer:        run.NewCloser(1),
-               l:             logger.GetLogger("schema-registry"),
+               l:             schemaLogger,
                checkInterval: registryConfig.checkInterval,
                watchers:      make(map[Kind]*watcher),
        }
diff --git a/banyand/metadata/schema/schema.go 
b/banyand/metadata/schema/schema.go
index 014c9ac8..49cdf8d6 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -38,6 +38,16 @@ type EventHandler interface {
        OnDelete(Metadata)
 }
 
+// DataInfoCollector provides methods to collect data node info.
+type DataInfoCollector interface {
+       CollectDataInfo(ctx context.Context, group string) 
(*databasev1.DataInfo, error)
+}
+
+// LiaisonInfoCollector provides methods to collect liaison node info.
+type LiaisonInfoCollector interface {
+       CollectLiaisonInfo(ctx context.Context, group string) 
(*databasev1.LiaisonInfo, error)
+}
+
 // UnimplementedOnInitHandler is a placeholder for unimplemented OnInitHandler.
 type UnimplementedOnInitHandler struct{}
 
diff --git a/banyand/stream/introducer.go b/banyand/stream/introducer.go
index fa07bd37..a9dda746 100644
--- a/banyand/stream/introducer.go
+++ b/banyand/stream/introducer.go
@@ -221,6 +221,7 @@ func (tst *tsTable) introduceMemPart(nextIntroduction 
*introduction, epoch uint6
        }
 
        next := nextIntroduction.memPart
+       tst.addPendingDataCount(-int64(next.mp.partMetadata.TotalCount))
        nextSnp := cur.copyAllTo(epoch)
        nextSnp.parts = append(nextSnp.parts, next)
        nextSnp.creator = snapshotCreatorMemPart
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 849498a9..dca00f73 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -55,13 +55,17 @@ type schemaRepo struct {
        l        *logger.Logger
        metadata metadata.Repo
        path     string
+       nodeID   string
+       role     databasev1.Role
 }
 
-func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string) 
schemaRepo {
+func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string, 
nodeID string) schemaRepo {
        sr := schemaRepo{
                l:        svc.l,
                path:     path,
                metadata: svc.metadata,
+               nodeID:   nodeID,
+               role:     databasev1.Role_ROLE_DATA,
                Repository: resourceSchema.NewRepository(
                        svc.metadata,
                        svc.l,
@@ -78,6 +82,7 @@ func newLiaisonSchemaRepo(path string, svc *liaison, 
streamDataNodeRegistry grpc
                l:        svc.l,
                path:     path,
                metadata: svc.metadata,
+               role:     databasev1.Role_ROLE_LIAISON,
                Repository: resourceSchema.NewRepository(
                        svc.metadata,
                        svc.l,
@@ -231,17 +236,226 @@ func (sr *schemaRepo) loadStream(metadata 
*commonv1.Metadata) (*stream, bool) {
 }
 
 func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable, 
option], error) {
+       if sr == nil {
+               return nil, fmt.Errorf("schemaRepo is nil")
+       }
        g, ok := sr.LoadGroup(groupName)
        if !ok {
                return nil, fmt.Errorf("group %s not found", groupName)
        }
        db := g.SupplyTSDB()
        if db == nil {
-               return nil, fmt.Errorf("tsdb for group %s not found", groupName)
+               return nil, fmt.Errorf("group %s not found", groupName)
        }
        return db.(storage.TSDB[*tsTable, option]), nil
 }
 
+// CollectDataInfo collects data info for a specific group.
+func (sr *schemaRepo) CollectDataInfo(ctx context.Context, group string) 
(*databasev1.DataInfo, error) {
+       if sr.nodeID == "" {
+               return nil, fmt.Errorf("node ID is empty")
+       }
+       node, nodeErr := sr.metadata.NodeRegistry().GetNode(ctx, sr.nodeID)
+       if nodeErr != nil {
+               return nil, fmt.Errorf("failed to get current node info: %w", 
nodeErr)
+       }
+       tsdb, tsdbErr := sr.loadTSDB(group)
+       if tsdbErr != nil {
+               return nil, tsdbErr
+       }
+       if tsdb == nil {
+               return nil, nil
+       }
+       segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+               Start: time.Unix(0, 0),
+               End:   time.Unix(0, timestamp.MaxNanoTime),
+       })
+       if segmentsErr != nil {
+               return nil, segmentsErr
+       }
+       var segmentInfoList []*databasev1.SegmentInfo
+       var totalDataSize int64
+       for _, segment := range segments {
+               timeRange := segment.GetTimeRange()
+               tables, _ := segment.Tables()
+               var shardInfoList []*databasev1.ShardInfo
+               for shardIdx, table := range tables {
+                       shardInfo := sr.collectShardInfo(table, 
uint32(shardIdx))
+                       shardInfoList = append(shardInfoList, shardInfo)
+                       totalDataSize += shardInfo.DataSizeBytes
+               }
+               seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
+               totalDataSize += seriesIndexInfo.DataSizeBytes
+               segmentInfo := &databasev1.SegmentInfo{
+                       SegmentId:       fmt.Sprintf("%d-%d", 
timeRange.Start.UnixNano(), timeRange.End.UnixNano()),
+                       TimeRangeStart:  
timeRange.Start.Format(time.RFC3339Nano),
+                       TimeRangeEnd:    timeRange.End.Format(time.RFC3339Nano),
+                       ShardInfo:       shardInfoList,
+                       SeriesIndexInfo: seriesIndexInfo,
+               }
+               segmentInfoList = append(segmentInfoList, segmentInfo)
+               segment.DecRef()
+       }
+       dataInfo := &databasev1.DataInfo{
+               Node:          node,
+               SegmentInfo:   segmentInfoList,
+               DataSizeBytes: totalDataSize,
+       }
+       return dataInfo, nil
+}
+
+func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable, 
option]) *databasev1.SeriesIndexInfo {
+       indexDB := segment.IndexDB()
+       if indexDB == nil {
+               return &databasev1.SeriesIndexInfo{
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+               }
+       }
+       dataCount, dataSizeBytes := indexDB.Stats()
+       return &databasev1.SeriesIndexInfo{
+               DataCount:     dataCount,
+               DataSizeBytes: dataSizeBytes,
+       }
+}
+
+func (sr *schemaRepo) collectShardInfo(table any, shardID uint32) 
*databasev1.ShardInfo {
+       tst, ok := table.(*tsTable)
+       if !ok {
+               return &databasev1.ShardInfo{
+                       ShardId:       shardID,
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+                       PartCount:     0,
+               }
+       }
+       snapshot := tst.currentSnapshot()
+       if snapshot == nil {
+               return &databasev1.ShardInfo{
+                       ShardId:       shardID,
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+                       PartCount:     0,
+               }
+       }
+       defer snapshot.decRef()
+       var totalCount, compressedSize, uncompressedSize, partCount uint64
+       for _, pw := range snapshot.parts {
+               if pw.p != nil {
+                       totalCount += pw.p.partMetadata.TotalCount
+                       compressedSize += pw.p.partMetadata.CompressedSizeBytes
+                       uncompressedSize += 
pw.p.partMetadata.UncompressedSizeBytes
+                       partCount++
+               } else if pw.mp != nil {
+                       totalCount += pw.mp.partMetadata.TotalCount
+                       compressedSize += pw.mp.partMetadata.CompressedSizeBytes
+                       uncompressedSize += 
pw.mp.partMetadata.UncompressedSizeBytes
+                       partCount++
+               }
+       }
+       invertedIndexInfo := sr.collectInvertedIndexInfo(tst)
+       return &databasev1.ShardInfo{
+               ShardId:           shardID,
+               DataCount:         int64(totalCount),
+               DataSizeBytes:     int64(compressedSize),
+               PartCount:         int64(partCount),
+               InvertedIndexInfo: invertedIndexInfo,
+               SidxInfo:          &databasev1.SIDXInfo{},
+       }
+}
+
+func (sr *schemaRepo) collectInvertedIndexInfo(tst *tsTable) 
*databasev1.InvertedIndexInfo {
+       if tst.index == nil {
+               return &databasev1.InvertedIndexInfo{
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+               }
+       }
+       dataCount, dataSizeBytes := tst.index.store.Stats()
+       return &databasev1.InvertedIndexInfo{
+               DataCount:     dataCount,
+               DataSizeBytes: dataSizeBytes,
+       }
+}
+
+func (sr *schemaRepo) collectPendingWriteInfo(groupName string) (int64, error) 
{
+       if sr == nil || sr.Repository == nil {
+               return 0, fmt.Errorf("schema repository is not initialized")
+       }
+       if sr.role == databasev1.Role_ROLE_LIAISON {
+               queue, queueErr := sr.loadQueue(groupName)
+               if queueErr != nil {
+                       return 0, fmt.Errorf("failed to load queue: %w", 
queueErr)
+               }
+               if queue == nil {
+                       return 0, nil
+               }
+               var pendingWriteCount int64
+               for _, sq := range queue.SubQueues() {
+                       if sq != nil {
+                               pendingWriteCount += sq.getPendingDataCount()
+                       }
+               }
+               return pendingWriteCount, nil
+       }
+       // Standalone mode
+       tsdb, tsdbErr := sr.loadTSDB(groupName)
+       if tsdbErr != nil {
+               return 0, fmt.Errorf("failed to load TSDB: %w", tsdbErr)
+       }
+       if tsdb == nil {
+               return 0, fmt.Errorf("TSDB is nil for group %s", groupName)
+       }
+       segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+               Start: time.Unix(0, 0),
+               End:   time.Unix(0, timestamp.MaxNanoTime),
+       })
+       if segmentsErr != nil {
+               return 0, fmt.Errorf("failed to select segments: %w", 
segmentsErr)
+       }
+       var pendingWriteCount int64
+       for _, segment := range segments {
+               tables, _ := segment.Tables()
+               for _, tst := range tables {
+                       pendingWriteCount += tst.getPendingDataCount()
+               }
+               segment.DecRef()
+       }
+       return pendingWriteCount, nil
+}
+
+func (sr *schemaRepo) collectPendingSyncInfo(groupName string) (partCount 
int64, totalSizeBytes int64, err error) {
+       if sr == nil || sr.Repository == nil {
+               return 0, 0, fmt.Errorf("schema repository is not initialized")
+       }
+       // Only liaison nodes collect pending sync info
+       if sr.role != databasev1.Role_ROLE_LIAISON {
+               return 0, 0, nil
+       }
+       queue, queueErr := sr.loadQueue(groupName)
+       if queueErr != nil {
+               return 0, 0, fmt.Errorf("failed to load queue: %w", queueErr)
+       }
+       if queue == nil {
+               return 0, 0, nil
+       }
+       for _, sq := range queue.SubQueues() {
+               if sq != nil {
+                       snapshot := sq.currentSnapshot()
+                       if snapshot != nil {
+                               for _, pw := range snapshot.parts {
+                                       if pw.mp == nil && pw.p != nil && 
pw.p.partMetadata.TotalCount > 0 {
+                                               partCount++
+                                               totalSizeBytes += 
int64(pw.p.partMetadata.CompressedSizeBytes)
+                                       }
+                               }
+                               snapshot.decRef()
+                       }
+               }
+       }
+       return partCount, totalSizeBytes, nil
+}
+
 func (sr *schemaRepo) loadQueue(groupName string) (*wqueue.Queue[*tsTable, 
option], error) {
        g, ok := sr.LoadGroup(groupName)
        if !ok {
diff --git a/banyand/stream/metrics.go b/banyand/stream/metrics.go
index 91686b49..3a757027 100644
--- a/banyand/stream/metrics.go
+++ b/banyand/stream/metrics.go
@@ -32,8 +32,8 @@ var (
 )
 
 type metrics struct {
-       tbMetrics
-       indexMetrics           *inverted.Metrics
+       indexMetrics *inverted.Metrics
+
        totalWritten           meter.Counter
        totalBatch             meter.Counter
        totalBatchIntroLatency meter.Counter
@@ -66,6 +66,8 @@ type metrics struct {
        totalMergedParts  meter.Counter
        totalMergeLatency meter.Counter
        totalMerged       meter.Counter
+
+       tbMetrics
 }
 
 func (tst *tsTable) incTotalWritten(delta int) {
@@ -250,6 +252,18 @@ func (tst *tsTable) incTotalMerged(delta int, typ string) {
        tst.metrics.totalMerged.Inc(float64(delta), typ)
 }
 
+func (tst *tsTable) addPendingDataCount(delta int64) {
+       tst.pendingDataCount.Add(delta)
+       if tst.metrics == nil {
+               return
+       }
+       tst.metrics.tbMetrics.pendingDataCount.Add(float64(delta), 
tst.p.ShardLabelValues()...)
+}
+
+func (tst *tsTable) getPendingDataCount() int64 {
+       return tst.pendingDataCount.Load()
+}
+
 func (m *metrics) DeleteAll() {
        if m == nil {
                return
@@ -334,6 +348,7 @@ func (s *supplier) newMetrics(p common.Position) 
storage.Metrics {
                        totalFileBlocks:                
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
                        totalFilePartBytes:             
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
                        totalFilePartUncompressedBytes: 
factory.NewGauge("total_file_part_uncompressed_bytes", 
common.ShardLabelNames()...),
+                       pendingDataCount:               
factory.NewGauge("pending_data_count", common.ShardLabelNames()...),
                },
                indexMetrics: inverted.NewMetrics(factory, 
common.SegLabelNames()...),
        }
@@ -379,6 +394,7 @@ func (s *queueSupplier) newMetrics(p common.Position) 
storage.Metrics {
                        totalFileBlocks:                
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
                        totalFilePartBytes:             
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
                        totalFilePartUncompressedBytes: 
factory.NewGauge("total_file_part_uncompressed_bytes", 
common.ShardLabelNames()...),
+                       pendingDataCount:               
factory.NewGauge("pending_data_count", common.ShardLabelNames()...),
                },
                indexMetrics: inverted.NewMetrics(factory, 
common.SegLabelNames()...),
        }
@@ -442,6 +458,7 @@ func (tst *tsTable) deleteMetrics() {
        
tst.metrics.tbMetrics.totalFileBlocks.Delete(tst.p.ShardLabelValues()...)
        
tst.metrics.tbMetrics.totalFilePartBytes.Delete(tst.p.ShardLabelValues()...)
        
tst.metrics.tbMetrics.totalFilePartUncompressedBytes.Delete(tst.p.ShardLabelValues()...)
+       
tst.metrics.tbMetrics.pendingDataCount.Delete(tst.p.ShardLabelValues()...)
        tst.metrics.indexMetrics.DeleteAll(tst.p.SegLabelValues()...)
 }
 
@@ -457,4 +474,6 @@ type tbMetrics struct {
        totalFileBlocks                meter.Gauge
        totalFilePartBytes             meter.Gauge
        totalFilePartUncompressedBytes meter.Gauge
+
+       pendingDataCount meter.Gauge
 }
diff --git a/banyand/stream/svc_liaison.go b/banyand/stream/svc_liaison.go
index e3e2b5de..25a3ec86 100644
--- a/banyand/stream/svc_liaison.go
+++ b/banyand/stream/svc_liaison.go
@@ -19,6 +19,7 @@ package stream
 
 import (
        "context"
+       "fmt"
        "path"
        "path/filepath"
        "strings"
@@ -78,6 +79,33 @@ func (s *liaison) GetRemovalSegmentsTimeRange(group string) 
*timestamp.TimeRange
        return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
 }
 
+func (s *liaison) CollectDataInfo(_ context.Context, _ string) 
(*databasev1.DataInfo, error) {
+       return nil, errors.New("collect data info is not supported on liaison 
node")
+}
+
+// CollectLiaisonInfo collects liaison node info.
+func (s *liaison) CollectLiaisonInfo(_ context.Context, group string) 
(*databasev1.LiaisonInfo, error) {
+       info := &databasev1.LiaisonInfo{
+               PendingWriteDataCount:       0,
+               PendingSyncPartCount:        0,
+               PendingSyncDataSizeBytes:    0,
+               PendingHandoffPartCount:     0,
+               PendingHandoffDataSizeBytes: 0,
+       }
+       pendingWriteCount, writeErr := 
s.schemaRepo.collectPendingWriteInfo(group)
+       if writeErr != nil {
+               return nil, fmt.Errorf("failed to collect pending write info: 
%w", writeErr)
+       }
+       info.PendingWriteDataCount = pendingWriteCount
+       pendingSyncPartCount, pendingSyncDataSizeBytes, syncErr := 
s.schemaRepo.collectPendingSyncInfo(group)
+       if syncErr != nil {
+               return nil, fmt.Errorf("failed to collect pending sync info: 
%w", syncErr)
+       }
+       info.PendingSyncPartCount = pendingSyncPartCount
+       info.PendingSyncDataSizeBytes = pendingSyncDataSizeBytes
+       return info, nil
+}
+
 func (s *liaison) FlagSet() *run.FlagSet {
        flagS := run.NewFlagSet("storage")
        flagS.StringVar(&s.root, "stream-root-path", "/tmp", "the root path of 
stream")
@@ -155,6 +183,15 @@ func (s *liaison) PreRun(ctx context.Context) error {
        // Register chunked sync handler for stream data
        s.pipeline.RegisterChunkedSyncHandler(data.TopicStreamPartSync, 
setUpChunkedSyncCallback(s.l, &s.schemaRepo))
 
+       if metaSvc, ok := s.metadata.(metadata.Service); ok {
+               
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_STREAM, s)
+       }
+
+       collectLiaisonInfoListener := &collectLiaisonInfoListener{s: s}
+       if subscribeErr := 
s.pipeline.Subscribe(data.TopicStreamCollectLiaisonInfo, 
collectLiaisonInfoListener); subscribeErr != nil {
+               return fmt.Errorf("failed to subscribe to collect liaison info 
topic: %w", subscribeErr)
+       }
+
        return s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
 }
 
@@ -181,3 +218,20 @@ func NewLiaison(metadata metadata.Repo, pipeline 
queue.Server, omr observability
                },
        }, nil
 }
+
+type collectLiaisonInfoListener struct {
+       *bus.UnImplementedHealthyListener
+       s *liaison
+}
+
+func (l *collectLiaisonInfoListener) Rev(ctx context.Context, message 
bus.Message) bus.Message {
+       req, ok := 
message.Data().(*databasev1.GroupRegistryServiceInspectRequest)
+       if !ok {
+               return bus.NewMessage(message.ID(), common.NewError("invalid 
data type for collect liaison info request"))
+       }
+       liaisonInfo, collectErr := l.s.CollectLiaisonInfo(ctx, req.Group)
+       if collectErr != nil {
+               return bus.NewMessage(message.ID(), common.NewError("failed to 
collect liaison info: %v", collectErr))
+       }
+       return bus.NewMessage(message.ID(), liaisonInfo)
+}
diff --git a/banyand/stream/svc_standalone.go b/banyand/stream/svc_standalone.go
index 1be24bf7..eea34da8 100644
--- a/banyand/stream/svc_standalone.go
+++ b/banyand/stream/svc_standalone.go
@@ -19,6 +19,7 @@ package stream
 
 import (
        "context"
+       "fmt"
        "path"
        "path/filepath"
        "strings"
@@ -57,6 +58,8 @@ type Service interface {
        run.Config
        run.Service
        Query
+       CollectDataInfo(context.Context, string) (*databasev1.DataInfo, error)
+       CollectLiaisonInfo(context.Context, string) (*databasev1.LiaisonInfo, 
error)
 }
 
 var _ Service = (*standalone)(nil)
@@ -238,11 +241,20 @@ func (s *standalone) PreRun(ctx context.Context) error {
        if !strings.HasPrefix(filepath.VolumeName(s.dataPath), 
filepath.VolumeName(path)) {
                obsservice.UpdatePath(s.dataPath)
        }
-       s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels)
+       s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels, node.NodeID)
+       if metaSvc, ok := s.metadata.(metadata.Service); ok {
+               metaSvc.RegisterDataCollector(commonv1.Catalog_CATALOG_STREAM, 
&s.schemaRepo)
+               
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_STREAM, s)
+       }
        if s.pipeline == nil {
                return nil
        }
 
+       collectDataInfoListener := &collectDataInfoListener{s: s}
+       if subscribeErr := 
s.pipeline.Subscribe(data.TopicStreamCollectDataInfo, collectDataInfoListener); 
subscribeErr != nil {
+               return fmt.Errorf("failed to subscribe to collect data info 
topic: %w", subscribeErr)
+       }
+
        s.localPipeline = queue.Local()
        if err := s.pipeline.Subscribe(data.TopicSnapshot, &snapshotListener{s: 
s}); err != nil {
                return err
@@ -347,3 +359,34 @@ func (d *deleteStreamSegmentsListener) Rev(_ 
context.Context, message bus.Messag
        deleted := db.DeleteExpiredSegments(req.SegmentSuffixes)
        return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), deleted)
 }
+
+func (s *standalone) CollectDataInfo(ctx context.Context, group string) 
(*databasev1.DataInfo, error) {
+       return s.schemaRepo.CollectDataInfo(ctx, group)
+}
+
+func (s *standalone) CollectLiaisonInfo(_ context.Context, group string) 
(*databasev1.LiaisonInfo, error) {
+       info := &databasev1.LiaisonInfo{}
+       pendingWriteCount, writeErr := 
s.schemaRepo.collectPendingWriteInfo(group)
+       if writeErr != nil {
+               return nil, fmt.Errorf("failed to collect pending write info: 
%w", writeErr)
+       }
+       info.PendingWriteDataCount = pendingWriteCount
+       return info, nil
+}
+
+type collectDataInfoListener struct {
+       *bus.UnImplementedHealthyListener
+       s *standalone
+}
+
+func (l *collectDataInfoListener) Rev(ctx context.Context, message 
bus.Message) bus.Message {
+       req, ok := 
message.Data().(*databasev1.GroupRegistryServiceInspectRequest)
+       if !ok {
+               return bus.NewMessage(message.ID(), common.NewError("invalid 
data type for collect data info request"))
+       }
+       dataInfo, collectErr := l.s.schemaRepo.CollectDataInfo(ctx, req.Group)
+       if collectErr != nil {
+               return bus.NewMessage(message.ID(), common.NewError("failed to 
collect data info: %v", collectErr))
+       }
+       return bus.NewMessage(message.ID(), dataInfo)
+}
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 26f345f0..cbd4ce81 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -49,21 +49,22 @@ const (
 )
 
 type tsTable struct {
-       fileSystem    fs.FileSystem
-       pm            protector.Memory
-       metrics       *metrics
-       index         *elementIndex
-       snapshot      *snapshot
-       loopCloser    *run.Closer
-       getNodes      func() []string
-       l             *logger.Logger
-       introductions chan *introduction
-       p             common.Position
-       group         string
-       root          string
-       gc            garbageCleaner
-       option        option
-       curPartID     uint64
+       fileSystem       fs.FileSystem
+       pm               protector.Memory
+       metrics          *metrics
+       index            *elementIndex
+       snapshot         *snapshot
+       loopCloser       *run.Closer
+       getNodes         func() []string
+       l                *logger.Logger
+       introductions    chan *introduction
+       p                common.Position
+       group            string
+       root             string
+       gc               garbageCleaner
+       option           option
+       curPartID        uint64
+       pendingDataCount atomic.Int64
        sync.RWMutex
        shardID common.ShardID
 }
@@ -327,9 +328,11 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
        ind.memPart.p.partMetadata.ID = atomic.AddUint64(&tst.curPartID, 1)
        startTime := time.Now()
        totalCount := mp.partMetadata.TotalCount
+       tst.addPendingDataCount(int64(totalCount))
        select {
        case tst.introductions <- ind:
        case <-tst.loopCloser.CloseNotify():
+               tst.addPendingDataCount(-int64(totalCount))
                return
        }
        select {
diff --git a/banyand/trace/handoff_controller.go 
b/banyand/trace/handoff_controller.go
index c3d1cfea..18f8d66c 100644
--- a/banyand/trace/handoff_controller.go
+++ b/banyand/trace/handoff_controller.go
@@ -653,6 +653,21 @@ func (hc *handoffController) getTotalSize() uint64 {
        return hc.currentTotalSize
 }
 
+// stats returns handoff statistics including part count and total size.
+func (hc *handoffController) stats() (partCount int64, totalSize int64) {
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+       var count int64
+       for _, nodeQueue := range hc.nodeQueues {
+               pending, listErr := nodeQueue.listPending()
+               if listErr != nil {
+                       continue
+               }
+               count += int64(len(pending))
+       }
+       return count, int64(hc.getTotalSize())
+}
+
 // canEnqueue checks if adding a part of the given size would exceed the total 
size limit.
 func (hc *handoffController) canEnqueue(partSize uint64) bool {
        if hc.maxTotalSizeBytes == 0 {
diff --git a/banyand/trace/introducer.go b/banyand/trace/introducer.go
index be2940c2..1b7e3e9a 100644
--- a/banyand/trace/introducer.go
+++ b/banyand/trace/introducer.go
@@ -231,6 +231,7 @@ func (tst *tsTable) introduceMemPart(nextIntroduction 
*introduction, epoch uint6
        }
 
        next := nextIntroduction.memPart
+       tst.addPendingDataCount(-int64(next.mp.partMetadata.TotalCount))
        nextSnp := cur.copyAllTo(epoch)
        nextSnp.parts = append(nextSnp.parts, next)
        nextSnp.creator = snapshotCreatorMemPart
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 6ddc1182..55a9d2c8 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -320,6 +320,7 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, 
closeCh <-chan struct{}
        pm.MaxTimestamp = maxTimestamp
        pm.mustWriteMetadata(fileSystem, dstPath)
        tf.mustWriteTraceIDFilter(fileSystem, dstPath)
+       tf.reset()
        tt.mustWriteTagType(fileSystem, dstPath)
        fileSystem.SyncPath(dstPath)
        p := mustOpenFilePart(partID, root, fileSystem)
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index 82040fb5..26f2e0ea 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -61,13 +61,17 @@ type schemaRepo struct {
        l        *logger.Logger
        metadata metadata.Repo
        path     string
+       nodeID   string
+       role     databasev1.Role
 }
 
-func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string) 
schemaRepo {
+func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string, 
nodeID string) schemaRepo {
        sr := schemaRepo{
                l:        svc.l,
                path:     path,
                metadata: svc.metadata,
+               nodeID:   nodeID,
+               role:     databasev1.Role_ROLE_DATA,
                Repository: resourceSchema.NewRepository(
                        svc.metadata,
                        svc.l,
@@ -84,6 +88,7 @@ func newLiaisonSchemaRepo(path string, svc *liaison, 
traceDataNodeRegistry grpc.
                l:        svc.l,
                path:     path,
                metadata: svc.metadata,
+               role:     databasev1.Role_ROLE_LIAISON,
                Repository: resourceSchema.NewRepository(
                        svc.metadata,
                        svc.l,
@@ -238,17 +243,242 @@ func (sr *schemaRepo) loadTrace(metadata 
*commonv1.Metadata) (*trace, bool) {
 }
 
 func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable, 
option], error) {
+       if sr == nil {
+               return nil, fmt.Errorf("schemaRepo is nil")
+       }
        g, ok := sr.LoadGroup(groupName)
        if !ok {
                return nil, fmt.Errorf("group %s not found", groupName)
        }
        db := g.SupplyTSDB()
        if db == nil {
-               return nil, fmt.Errorf("tsdb for group %s not found", groupName)
+               return nil, fmt.Errorf("group %s not found", groupName)
        }
        return db.(storage.TSDB[*tsTable, option]), nil
 }
 
+// CollectDataInfo collects data info for a specific group.
+func (sr *schemaRepo) CollectDataInfo(ctx context.Context, group string) 
(*databasev1.DataInfo, error) {
+       if sr.nodeID == "" {
+               return nil, fmt.Errorf("node ID is empty")
+       }
+       node, nodeErr := sr.metadata.NodeRegistry().GetNode(ctx, sr.nodeID)
+       if nodeErr != nil {
+               return nil, fmt.Errorf("failed to get current node info: %w", 
nodeErr)
+       }
+       tsdb, tsdbErr := sr.loadTSDB(group)
+       if tsdbErr != nil {
+               return nil, tsdbErr
+       }
+       if tsdb == nil {
+               return nil, nil
+       }
+       segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+               Start: time.Unix(0, 0),
+               End:   time.Unix(0, timestamp.MaxNanoTime),
+       })
+       if segmentsErr != nil {
+               return nil, segmentsErr
+       }
+       var segmentInfoList []*databasev1.SegmentInfo
+       var totalDataSize int64
+       for _, segment := range segments {
+               timeRange := segment.GetTimeRange()
+               tables, _ := segment.Tables()
+               var shardInfoList []*databasev1.ShardInfo
+               for shardIdx, table := range tables {
+                       shardInfo := sr.collectShardInfo(ctx, table, 
uint32(shardIdx))
+                       shardInfoList = append(shardInfoList, shardInfo)
+                       totalDataSize += shardInfo.DataSizeBytes
+               }
+               seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
+               totalDataSize += seriesIndexInfo.DataSizeBytes
+               segmentInfo := &databasev1.SegmentInfo{
+                       SegmentId:       fmt.Sprintf("%d-%d", 
timeRange.Start.UnixNano(), timeRange.End.UnixNano()),
+                       TimeRangeStart:  
timeRange.Start.Format(time.RFC3339Nano),
+                       TimeRangeEnd:    timeRange.End.Format(time.RFC3339Nano),
+                       ShardInfo:       shardInfoList,
+                       SeriesIndexInfo: seriesIndexInfo,
+               }
+               segmentInfoList = append(segmentInfoList, segmentInfo)
+               segment.DecRef()
+       }
+       dataInfo := &databasev1.DataInfo{
+               Node:          node,
+               SegmentInfo:   segmentInfoList,
+               DataSizeBytes: totalDataSize,
+       }
+       return dataInfo, nil
+}
+
+func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable, 
option]) *databasev1.SeriesIndexInfo {
+       indexDB := segment.IndexDB()
+       if indexDB == nil {
+               return &databasev1.SeriesIndexInfo{
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+               }
+       }
+       dataCount, dataSizeBytes := indexDB.Stats()
+       return &databasev1.SeriesIndexInfo{
+               DataCount:     dataCount,
+               DataSizeBytes: dataSizeBytes,
+       }
+}
+
+func (sr *schemaRepo) collectShardInfo(ctx context.Context, table any, shardID 
uint32) *databasev1.ShardInfo {
+       tst, ok := table.(*tsTable)
+       if !ok {
+               return &databasev1.ShardInfo{
+                       ShardId:       shardID,
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+                       PartCount:     0,
+               }
+       }
+       snapshot := tst.currentSnapshot()
+       if snapshot == nil {
+               return &databasev1.ShardInfo{
+                       ShardId:       shardID,
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+                       PartCount:     0,
+               }
+       }
+       defer snapshot.decRef()
+       var totalCount, compressedSize, uncompressedSize, partCount uint64
+       for _, pw := range snapshot.parts {
+               if pw.p != nil {
+                       totalCount += pw.p.partMetadata.TotalCount
+                       compressedSize += pw.p.partMetadata.CompressedSizeBytes
+                       uncompressedSize += 
pw.p.partMetadata.UncompressedSpanSizeBytes
+                       partCount++
+               } else if pw.mp != nil {
+                       totalCount += pw.mp.partMetadata.TotalCount
+                       compressedSize += pw.mp.partMetadata.CompressedSizeBytes
+                       uncompressedSize += 
pw.mp.partMetadata.UncompressedSpanSizeBytes
+                       partCount++
+               }
+       }
+       sidxInfo := sr.collectSidxInfo(ctx, tst)
+       return &databasev1.ShardInfo{
+               ShardId:           shardID,
+               DataCount:         int64(totalCount),
+               DataSizeBytes:     int64(compressedSize),
+               PartCount:         int64(partCount),
+               InvertedIndexInfo: &databasev1.InvertedIndexInfo{},
+               SidxInfo:          sidxInfo,
+       }
+}
+
+func (sr *schemaRepo) collectSidxInfo(ctx context.Context, tst *tsTable) 
*databasev1.SIDXInfo {
+       sidxMap := tst.sidxMap
+       if len(sidxMap) == 0 {
+               return &databasev1.SIDXInfo{
+                       DataCount:     0,
+                       DataSizeBytes: 0,
+                       PartCount:     0,
+               }
+       }
+       timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
+       defer cancel()
+       var totalDataCount, totalDataSize, totalPartCount int64
+       for _, sidxInstance := range sidxMap {
+               stats, statsErr := sidxInstance.Stats(timeoutCtx)
+               if statsErr != nil {
+                       continue
+               }
+               if stats != nil {
+                       totalDataCount += stats.ElementCount
+                       totalDataSize += stats.DiskUsageBytes
+                       totalPartCount += stats.PartCount
+               }
+       }
+       return &databasev1.SIDXInfo{
+               DataCount:     totalDataCount,
+               DataSizeBytes: totalDataSize,
+               PartCount:     totalPartCount,
+       }
+}
+
+func (sr *schemaRepo) collectPendingWriteInfo(groupName string) (int64, error) 
{
+       if sr == nil || sr.Repository == nil {
+               return 0, fmt.Errorf("schema repository is not initialized")
+       }
+       if sr.role == databasev1.Role_ROLE_LIAISON {
+               queue, queueErr := sr.loadQueue(groupName)
+               if queueErr != nil {
+                       return 0, fmt.Errorf("failed to load queue: %w", 
queueErr)
+               }
+               if queue == nil {
+                       return 0, nil
+               }
+               var pendingWriteCount int64
+               for _, sq := range queue.SubQueues() {
+                       if sq != nil {
+                               pendingWriteCount += sq.getPendingDataCount()
+                       }
+               }
+               return pendingWriteCount, nil
+       }
+       // Standalone mode
+       tsdb, tsdbErr := sr.loadTSDB(groupName)
+       if tsdbErr != nil {
+               return 0, fmt.Errorf("failed to load TSDB: %w", tsdbErr)
+       }
+       if tsdb == nil {
+               return 0, fmt.Errorf("TSDB is nil for group %s", groupName)
+       }
+       segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+               Start: time.Unix(0, 0),
+               End:   time.Unix(0, timestamp.MaxNanoTime),
+       })
+       if segmentsErr != nil {
+               return 0, fmt.Errorf("failed to select segments: %w", 
segmentsErr)
+       }
+       var pendingWriteCount int64
+       for _, segment := range segments {
+               tables, _ := segment.Tables()
+               for _, tst := range tables {
+                       pendingWriteCount += tst.getPendingDataCount()
+               }
+               segment.DecRef()
+       }
+       return pendingWriteCount, nil
+}
+
+func (sr *schemaRepo) collectPendingSyncInfo(groupName string) (partCount 
int64, totalSizeBytes int64, err error) {
+       if sr == nil || sr.Repository == nil {
+               return 0, 0, fmt.Errorf("schema repository is not initialized")
+       }
+       // Only liaison nodes collect pending sync info
+       if sr.role != databasev1.Role_ROLE_LIAISON {
+               return 0, 0, nil
+       }
+       queue, queueErr := sr.loadQueue(groupName)
+       if queueErr != nil {
+               return 0, 0, fmt.Errorf("failed to load queue: %w", queueErr)
+       }
+       if queue == nil {
+               return 0, 0, nil
+       }
+       for _, sq := range queue.SubQueues() {
+               if sq != nil {
+                       snapshot := sq.currentSnapshot()
+                       if snapshot != nil {
+                               for _, pw := range snapshot.parts {
+                                       if pw.mp == nil && pw.p != nil && 
pw.p.partMetadata.TotalCount > 0 {
+                                               partCount++
+                                               totalSizeBytes += 
int64(pw.p.partMetadata.CompressedSizeBytes)
+                                       }
+                               }
+                               snapshot.decRef()
+                       }
+               }
+       }
+       return partCount, totalSizeBytes, nil
+}
+
 func (sr *schemaRepo) loadQueue(groupName string) (*wqueue.Queue[*tsTable, 
option], error) {
        g, ok := sr.LoadGroup(groupName)
        if !ok {
diff --git a/banyand/trace/metrics.go b/banyand/trace/metrics.go
index 433a488d..d3c520af 100644
--- a/banyand/trace/metrics.go
+++ b/banyand/trace/metrics.go
@@ -31,7 +31,6 @@ var (
 )
 
 type metrics struct {
-       tbMetrics
        indexMetrics           *inverted.Metrics
        totalWritten           meter.Counter
        totalBatch             meter.Counter
@@ -65,6 +64,8 @@ type metrics struct {
        totalMergedParts  meter.Counter
        totalMergeLatency meter.Counter
        totalMerged       meter.Counter
+
+       tbMetrics
 }
 
 func (tst *tsTable) incTotalWritten(delta int) {
@@ -249,6 +250,18 @@ func (tst *tsTable) incTotalMerged(delta int, typ string) {
        tst.metrics.totalMerged.Inc(float64(delta), typ)
 }
 
+func (tst *tsTable) addPendingDataCount(delta int64) {
+       tst.pendingDataCount.Add(delta)
+       if tst.metrics == nil {
+               return
+       }
+       tst.metrics.tbMetrics.pendingDataCount.Add(float64(delta), 
tst.p.ShardLabelValues()...)
+}
+
+func (tst *tsTable) getPendingDataCount() int64 {
+       return tst.pendingDataCount.Load()
+}
+
 func (m *metrics) DeleteAll() {
        if m == nil {
                return
@@ -333,6 +346,7 @@ func (s *supplier) newMetrics(p common.Position) 
storage.Metrics {
                        totalFileBlocks:                
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
                        totalFilePartBytes:             
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
                        totalFilePartUncompressedBytes: 
factory.NewGauge("total_file_part_uncompressed_bytes", 
common.ShardLabelNames()...),
+                       pendingDataCount:               
factory.NewGauge("pending_data_count", common.ShardLabelNames()...),
                },
                indexMetrics: inverted.NewMetrics(factory, 
common.SegLabelNames()...),
        }
@@ -378,6 +392,7 @@ func (qs *queueSupplier) newMetrics(p common.Position) 
storage.Metrics {
                        totalFileBlocks:                
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
                        totalFilePartBytes:             
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
                        totalFilePartUncompressedBytes: 
factory.NewGauge("total_file_part_uncompressed_bytes", 
common.ShardLabelNames()...),
+                       pendingDataCount:               
factory.NewGauge("pending_data_count", common.ShardLabelNames()...),
                },
                indexMetrics: inverted.NewMetrics(factory, 
common.SegLabelNames()...),
        }
@@ -438,6 +453,7 @@ func (tst *tsTable) deleteMetrics() {
        
tst.metrics.tbMetrics.totalFileBlocks.Delete(tst.p.ShardLabelValues()...)
        
tst.metrics.tbMetrics.totalFilePartBytes.Delete(tst.p.ShardLabelValues()...)
        
tst.metrics.tbMetrics.totalFilePartUncompressedBytes.Delete(tst.p.ShardLabelValues()...)
+       
tst.metrics.tbMetrics.pendingDataCount.Delete(tst.p.ShardLabelValues()...)
        tst.metrics.indexMetrics.DeleteAll(tst.p.SegLabelValues()...)
 }
 
@@ -453,4 +469,6 @@ type tbMetrics struct {
        totalFileBlocks                meter.Gauge
        totalFilePartBytes             meter.Gauge
        totalFilePartUncompressedBytes meter.Gauge
+
+       pendingDataCount meter.Gauge
 }
diff --git a/banyand/trace/svc_liaison.go b/banyand/trace/svc_liaison.go
index 2e7874e9..2ed967c1 100644
--- a/banyand/trace/svc_liaison.go
+++ b/banyand/trace/svc_liaison.go
@@ -86,6 +86,23 @@ func NewLiaison(metadata metadata.Repo, pipeline 
queue.Server, omr observability
        }, nil
 }
 
+type collectLiaisonInfoListener struct {
+       *bus.UnImplementedHealthyListener
+       l *liaison
+}
+
+func (cl *collectLiaisonInfoListener) Rev(ctx context.Context, message 
bus.Message) bus.Message {
+       req, ok := 
message.Data().(*databasev1.GroupRegistryServiceInspectRequest)
+       if !ok {
+               return bus.NewMessage(message.ID(), common.NewError("invalid 
data type for collect liaison info request"))
+       }
+       liaisonInfo, collectErr := cl.l.CollectLiaisonInfo(ctx, req.Group)
+       if collectErr != nil {
+               return bus.NewMessage(message.ID(), common.NewError("failed to 
collect liaison info: %v", collectErr))
+       }
+       return bus.NewMessage(message.ID(), liaisonInfo)
+}
+
 // LiaisonService returns a new liaison service (deprecated - use NewLiaison).
 func LiaisonService(_ context.Context) (Service, error) {
        return &liaison{}, nil
@@ -251,6 +268,15 @@ func (l *liaison) PreRun(ctx context.Context) error {
                Str("dataPath", l.dataPath).
                Msg("trace liaison service initialized")
 
+       if metaSvc, ok := l.metadata.(metadata.Service); ok {
+               
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_TRACE, l)
+       }
+
+       collectLiaisonInfoListener := &collectLiaisonInfoListener{l: l}
+       if subscribeErr := 
l.pipeline.Subscribe(data.TopicTraceCollectLiaisonInfo, 
collectLiaisonInfoListener); subscribeErr != nil {
+               return fmt.Errorf("failed to subscribe to collect liaison info 
topic: %w", subscribeErr)
+       }
+
        return l.pipeline.Subscribe(data.TopicTraceWrite, l.writeListener)
 }
 
@@ -286,6 +312,38 @@ func (l *liaison) GetRemovalSegmentsTimeRange(group 
string) *timestamp.TimeRange
        return l.schemaRepo.GetRemovalSegmentsTimeRange(group)
 }
 
+func (l *liaison) CollectDataInfo(_ context.Context, _ string) 
(*databasev1.DataInfo, error) {
+       return nil, errors.New("collect data info is not supported on liaison 
node")
+}
+
+// CollectLiaisonInfo collects liaison node info.
+func (l *liaison) CollectLiaisonInfo(_ context.Context, group string) 
(*databasev1.LiaisonInfo, error) {
+       info := &databasev1.LiaisonInfo{
+               PendingWriteDataCount:       0,
+               PendingSyncPartCount:        0,
+               PendingSyncDataSizeBytes:    0,
+               PendingHandoffPartCount:     0,
+               PendingHandoffDataSizeBytes: 0,
+       }
+       pendingWriteCount, writeErr := 
l.schemaRepo.collectPendingWriteInfo(group)
+       if writeErr != nil {
+               return nil, fmt.Errorf("failed to collect pending write info: 
%w", writeErr)
+       }
+       info.PendingWriteDataCount = pendingWriteCount
+       pendingSyncPartCount, pendingSyncDataSizeBytes, syncErr := 
l.schemaRepo.collectPendingSyncInfo(group)
+       if syncErr != nil {
+               return nil, fmt.Errorf("failed to collect pending sync info: 
%w", syncErr)
+       }
+       info.PendingSyncPartCount = pendingSyncPartCount
+       info.PendingSyncDataSizeBytes = pendingSyncDataSizeBytes
+       if l.handoffCtrl != nil {
+               partCount, totalSize := l.handoffCtrl.stats()
+               info.PendingHandoffPartCount = partCount
+               info.PendingHandoffDataSizeBytes = totalSize
+       }
+       return info, nil
+}
+
 // SetMetadata sets the metadata repository.
 func (l *liaison) SetMetadata(metadata metadata.Repo) {
        l.metadata = metadata
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index ec893bbf..fceaf43d 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -150,7 +150,15 @@ func (s *standalone) PreRun(ctx context.Context) error {
        if !strings.HasPrefix(filepath.VolumeName(s.dataPath), 
filepath.VolumeName(path)) {
                obsservice.UpdatePath(s.dataPath)
        }
-       s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels)
+       s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels, node.NodeID)
+       if metaSvc, ok := s.metadata.(metadata.Service); ok {
+               metaSvc.RegisterDataCollector(commonv1.Catalog_CATALOG_TRACE, 
&s.schemaRepo)
+               
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_TRACE, s)
+       }
+       subErr := s.pipeline.Subscribe(data.TopicTraceCollectDataInfo, 
&collectDataInfoListener{s: s})
+       if subErr != nil {
+               return fmt.Errorf("failed to subscribe to 
TopicTraceCollectDataInfo: %w", subErr)
+       }
 
        // Initialize snapshot directory
        s.snapshotDir = filepath.Join(path, "snapshots")
@@ -505,6 +513,37 @@ func (s *standaloneDeleteTraceSegmentsListener) Rev(_ 
context.Context, message b
        return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), deleted)
 }
 
+func (s *standalone) CollectDataInfo(ctx context.Context, group string) 
(*databasev1.DataInfo, error) {
+       return s.schemaRepo.CollectDataInfo(ctx, group)
+}
+
+func (s *standalone) CollectLiaisonInfo(_ context.Context, group string) 
(*databasev1.LiaisonInfo, error) {
+       info := &databasev1.LiaisonInfo{}
+       pendingWriteCount, writeErr := 
s.schemaRepo.collectPendingWriteInfo(group)
+       if writeErr != nil {
+               return nil, fmt.Errorf("failed to collect pending write info: 
%w", writeErr)
+       }
+       info.PendingWriteDataCount = pendingWriteCount
+       return info, nil
+}
+
+type collectDataInfoListener struct {
+       *bus.UnImplementedHealthyListener
+       s *standalone
+}
+
+func (l *collectDataInfoListener) Rev(ctx context.Context, message 
bus.Message) bus.Message {
+       req, ok := 
message.Data().(*databasev1.GroupRegistryServiceInspectRequest)
+       if !ok {
+               return bus.NewMessage(message.ID(), common.NewError("invalid 
data type for collect data info request"))
+       }
+       dataInfo, collectErr := l.s.schemaRepo.CollectDataInfo(ctx, req.Group)
+       if collectErr != nil {
+               return bus.NewMessage(message.ID(), common.NewError("failed to 
collect data info: %v", collectErr))
+       }
+       return bus.NewMessage(message.ID(), dataInfo)
+}
+
 // NewService returns a new service.
 func NewService(metadata metadata.Repo, pipeline queue.Server, omr 
observability.MetricsRegistry, pm protector.Memory) (Service, error) {
        return &standalone{
diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go
index 4b16a4f0..53ff3959 100644
--- a/banyand/trace/trace.go
+++ b/banyand/trace/trace.go
@@ -66,6 +66,8 @@ type Service interface {
        run.Config
        run.Service
        Query
+       CollectDataInfo(context.Context, string) (*databasev1.DataInfo, error)
+       CollectLiaisonInfo(context.Context, string) (*databasev1.LiaisonInfo, 
error)
 }
 
 // Query allows retrieving traces.
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 6d168568..cc47bae7 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -48,22 +48,23 @@ const (
 )
 
 type tsTable struct {
-       pm            protector.Memory
-       fileSystem    fs.FileSystem
-       handoffCtrl   *handoffController
-       metrics       *metrics
-       snapshot      *snapshot
-       loopCloser    *run.Closer
-       getNodes      func() []string
-       l             *logger.Logger
-       sidxMap       map[string]sidx.SIDX
-       introductions chan *introduction
-       p             common.Position
-       root          string
-       group         string
-       gc            garbageCleaner
-       option        option
-       curPartID     uint64
+       pm               protector.Memory
+       fileSystem       fs.FileSystem
+       handoffCtrl      *handoffController
+       metrics          *metrics
+       snapshot         *snapshot
+       loopCloser       *run.Closer
+       getNodes         func() []string
+       l                *logger.Logger
+       sidxMap          map[string]sidx.SIDX
+       introductions    chan *introduction
+       p                common.Position
+       root             string
+       group            string
+       gc               garbageCleaner
+       option           option
+       curPartID        uint64
+       pendingDataCount atomic.Int64
        sync.RWMutex
        shardID common.ShardID
 }
@@ -470,9 +471,11 @@ func (tst *tsTable) mustAddMemPart(mp *memPart, 
sidxReqsMap map[string]*sidx.Mem
        ind.sidxReqsMap = sidxReqsMap
        startTime := time.Now()
        totalCount := mp.partMetadata.TotalCount
+       tst.addPendingDataCount(int64(totalCount))
        select {
        case tst.introductions <- ind:
        case <-tst.loopCloser.CloseNotify():
+               tst.addPendingDataCount(-int64(totalCount))
                ind.memPart.decRef()
                return
        }
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index a3c19c2c..082de98e 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -55,6 +55,9 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
        }
        tire1Client := pub.New(metaSvc, databasev1.Role_ROLE_LIAISON)
        tire2Client := pub.New(metaSvc, databasev1.Role_ROLE_DATA)
+       metaSvc.SetDataBroadcaster(tire2Client)
+       metaSvc.SetLiaisonBroadcaster(tire1Client)
+
        localPipeline := queue.Local()
 
        measureLiaisonNodeSel := 
node.NewRoundRobinSelector(data.TopicMeasureWrite.String(), metaSvc)
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 6e77f821..2c6372e9 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -828,6 +828,7 @@ type Store interface {
        CollectMetrics(...string)
        Reset()
        TakeFileSnapshot(dst string) error
+       Stats() (dataCount int64, dataSizeBytes int64)
 }
 
 // Series represents a series in an index.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 1759d6e2..38779a8c 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -512,6 +512,21 @@ func (s *store) TakeFileSnapshot(dst string) error {
        return reader.Backup(dst, nil)
 }
 
+// Stats returns the index statistics including document count and disk size.
+func (s *store) Stats() (dataCount int64, dataSizeBytes int64) {
+       reader, err := s.writer.Reader()
+       if err != nil {
+               return 0, 0
+       }
+       defer reader.Close()
+       count, countErr := reader.Count()
+       if countErr != nil {
+               return 0, 0
+       }
+       status := s.writer.Status()
+       return int64(count), int64(status.CurOnDiskBytes)
+}
+
 type blugeMatchIterator struct {
        delegated     search.DocumentMatchIterator
        err           error
diff --git a/test/integration/distributed/inspect/inspect_suite_test.go 
b/test/integration/distributed/inspect/inspect_suite_test.go
new file mode 100644
index 00000000..bee00eaa
--- /dev/null
+++ b/test/integration/distributed/inspect/inspect_suite_test.go
@@ -0,0 +1,613 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package integration_inspect_test provides integration tests for the inspect 
functionality in distributed mode.
+package integration_inspect_test
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "strconv"
+       "strings"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+func TestInspect(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Distributed Inspect Suite")
+}
+
+var (
+       deferFunc          func()
+       goods              []gleak.Goroutine
+       connection         *grpc.ClientConn
+       groupClient        databasev1.GroupRegistryServiceClient
+       measureRegClient   databasev1.MeasureRegistryServiceClient
+       streamRegClient    databasev1.StreamRegistryServiceClient
+       traceRegClient     databasev1.TraceRegistryServiceClient
+       measureWriteClient measurev1.MeasureServiceClient
+       streamWriteClient  streamv1.StreamServiceClient
+       traceWriteClient   tracev1.TraceServiceClient
+       etcdEndpoint       string
+       liaisonGrpcAddr    string
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+       pool.EnableStackTracking(true)
+       goods = gleak.Goroutines()
+
+       By("Starting etcd server")
+       ports, err := test.AllocateFreePorts(2)
+       Expect(err).NotTo(HaveOccurred())
+       dir, spaceDef, err := test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+       server, err := embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+               embeddedetcd.RootDir(dir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
+       Expect(err).ShouldNot(HaveOccurred())
+       <-server.ReadyNotify()
+
+       By("Starting data node 0")
+       closeDataNode0 := setup.DataNode(ep)
+       By("Starting data node 1")
+       closeDataNode1 := setup.DataNode(ep)
+       By("Starting liaison node")
+       liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep)
+
+       deferFunc = func() {
+               closerLiaisonNode()
+               closeDataNode0()
+               closeDataNode1()
+               _ = server.Close()
+               <-server.StopNotify()
+               spaceDef()
+       }
+
+       return []byte(fmt.Sprintf("%s,%s", liaisonAddr, ep))
+}, func(address []byte) {
+       parts := strings.Split(string(address), ",")
+       if len(parts) != 2 {
+               panic(fmt.Sprintf("expected 2 parts, got %d", len(parts)))
+       }
+       liaisonGrpcAddr = parts[0]
+       etcdEndpoint = parts[1]
+
+       var err error
+       connection, err = grpchelper.Conn(liaisonGrpcAddr, 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       Expect(err).NotTo(HaveOccurred())
+
+       groupClient = databasev1.NewGroupRegistryServiceClient(connection)
+       measureRegClient = 
databasev1.NewMeasureRegistryServiceClient(connection)
+       streamRegClient = databasev1.NewStreamRegistryServiceClient(connection)
+       traceRegClient = databasev1.NewTraceRegistryServiceClient(connection)
+       measureWriteClient = measurev1.NewMeasureServiceClient(connection)
+       streamWriteClient = streamv1.NewStreamServiceClient(connection)
+       traceWriteClient = tracev1.NewTraceServiceClient(connection)
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {})
+
+var _ = ReportAfterSuite("Distributed Inspect Suite", func(report Report) {
+       if report.SuiteSucceeded {
+               if deferFunc != nil {
+                       deferFunc()
+               }
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+       }
+})
+
+func writeMeasureData(ctx context.Context, groupName, measureName string, 
dataCount int) {
+       writeClient, writeErr := measureWriteClient.Write(ctx)
+       Expect(writeErr).ShouldNot(HaveOccurred())
+
+       metadata := &commonv1.Metadata{
+               Name:  measureName,
+               Group: groupName,
+       }
+       baseTime := time.Now().Truncate(time.Millisecond)
+       for idx := 0; idx < dataCount; idx++ {
+               req := &measurev1.WriteRequest{
+                       Metadata: metadata,
+                       DataPoint: &measurev1.DataPointValue{
+                               Timestamp: 
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second)),
+                               TagFamilies: []*modelv1.TagFamilyForWrite{{
+                                       Tags: []*modelv1.TagValue{{
+                                               Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "id_" + strconv.Itoa(idx)}},
+                                       }},
+                               }},
+                               Fields: []*modelv1.FieldValue{{
+                                       Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: int64(idx * 100)}},
+                               }},
+                       },
+                       MessageId: uint64(time.Now().UnixNano()),
+               }
+               sendErr := writeClient.Send(req)
+               Expect(sendErr).ShouldNot(HaveOccurred())
+       }
+       Expect(writeClient.CloseSend()).To(Succeed())
+       Eventually(func() error {
+               _, recvErr := writeClient.Recv()
+               return recvErr
+       }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+}
+
+func writeStreamData(ctx context.Context, groupName, streamName string, 
dataCount int) {
+       writeClient, writeErr := streamWriteClient.Write(ctx)
+       Expect(writeErr).ShouldNot(HaveOccurred())
+
+       metadata := &commonv1.Metadata{
+               Name:  streamName,
+               Group: groupName,
+       }
+       baseTime := time.Now().Truncate(time.Millisecond)
+       for idx := 0; idx < dataCount; idx++ {
+               req := &streamv1.WriteRequest{
+                       Metadata: metadata,
+                       Element: &streamv1.ElementValue{
+                               ElementId: strconv.Itoa(idx),
+                               Timestamp: 
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second)),
+                               TagFamilies: []*modelv1.TagFamilyForWrite{{
+                                       Tags: []*modelv1.TagValue{{
+                                               Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_" + strconv.Itoa(idx)}},
+                                       }},
+                               }},
+                       },
+                       MessageId: uint64(time.Now().UnixNano()),
+               }
+               sendErr := writeClient.Send(req)
+               Expect(sendErr).ShouldNot(HaveOccurred())
+       }
+       Expect(writeClient.CloseSend()).To(Succeed())
+       Eventually(func() error {
+               _, recvErr := writeClient.Recv()
+               return recvErr
+       }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+}
+
+func writeTraceData(ctx context.Context, groupName, traceName string, 
dataCount int) {
+       writeClient, writeErr := traceWriteClient.Write(ctx)
+       Expect(writeErr).ShouldNot(HaveOccurred())
+
+       metadata := &commonv1.Metadata{
+               Name:  traceName,
+               Group: groupName,
+       }
+       baseTime := time.Now().Truncate(time.Millisecond)
+       for idx := 0; idx < dataCount; idx++ {
+               traceID := fmt.Sprintf("trace_%d", idx)
+               spanID := fmt.Sprintf("span_%d", idx)
+               req := &tracev1.WriteRequest{
+                       Metadata: metadata,
+                       Tags: []*modelv1.TagValue{
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: traceID}}},
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: spanID}}},
+                               {Value: &modelv1.TagValue_Timestamp{Timestamp: 
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second))}},
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "test_service"}}},
+                               {Value: &modelv1.TagValue_Int{Int: 
&modelv1.Int{Value: int64(idx * 10)}}},
+                       },
+                       Span:    []byte(fmt.Sprintf("span_data_%d", idx)),
+                       Version: uint64(idx + 1),
+               }
+               sendErr := writeClient.Send(req)
+               Expect(sendErr).ShouldNot(HaveOccurred())
+       }
+       Expect(writeClient.CloseSend()).To(Succeed())
+       Eventually(func() error {
+               _, recvErr := writeClient.Recv()
+               return recvErr
+       }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+}
+
+var _ = Describe("Inspect in distributed mode", func() {
+       var groupName string
+       var measureName string
+       var ctx context.Context
+       const dataCount = 100
+
+       BeforeEach(func() {
+               ctx = context.TODO()
+               groupName = fmt.Sprintf("inspect-test-%d", 
time.Now().UnixNano())
+               measureName = "test_measure"
+
+               By("Creating measure group")
+               _, createErr := groupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{
+                       Group: &commonv1.Group{
+                               Metadata: &commonv1.Metadata{
+                                       Name: groupName,
+                               },
+                               Catalog: commonv1.Catalog_CATALOG_MEASURE,
+                               ResourceOpts: &commonv1.ResourceOpts{
+                                       ShardNum: 2,
+                                       SegmentInterval: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  1,
+                                       },
+                                       Ttl: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  7,
+                                       },
+                               },
+                       },
+               })
+               Expect(createErr).ShouldNot(HaveOccurred())
+
+               By("Creating measure schema")
+               _, measureErr := measureRegClient.Create(ctx, 
&databasev1.MeasureRegistryServiceCreateRequest{
+                       Measure: &databasev1.Measure{
+                               Metadata: &commonv1.Metadata{
+                                       Name:  measureName,
+                                       Group: groupName,
+                               },
+                               Entity: &databasev1.Entity{
+                                       TagNames: []string{"id"},
+                               },
+                               TagFamilies: []*databasev1.TagFamilySpec{{
+                                       Name: "default",
+                                       Tags: []*databasev1.TagSpec{{
+                                               Name: "id",
+                                               Type: 
databasev1.TagType_TAG_TYPE_STRING,
+                                       }},
+                               }},
+                               Fields: []*databasev1.FieldSpec{{
+                                       Name:              "value",
+                                       FieldType:         
databasev1.FieldType_FIELD_TYPE_INT,
+                                       EncodingMethod:    
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+                                       CompressionMethod: 
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+                               }},
+                       },
+               })
+               Expect(measureErr).ShouldNot(HaveOccurred())
+               time.Sleep(2 * time.Second)
+
+               By("Writing measure data")
+               writeMeasureData(ctx, groupName, measureName, dataCount)
+               time.Sleep(5 * time.Second)
+       })
+
+       AfterEach(func() {
+               _, _ = groupClient.Delete(ctx, 
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+       })
+
+       It("should return schema info", func() {
+               By("Inspecting group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+
+               By("Verifying schema info contains the measure")
+               Expect(resp.SchemaInfo).NotTo(BeNil())
+               
Expect(len(resp.SchemaInfo.Measures)).Should(BeNumerically(">=", 1))
+               Expect(resp.SchemaInfo.Measures).To(ContainElement(measureName))
+               GinkgoWriter.Printf("Schema info: measures=%d, streams=%d, 
traces=%d, indexRules=%d\n",
+                       len(resp.SchemaInfo.Measures),
+                       len(resp.SchemaInfo.Streams),
+                       len(resp.SchemaInfo.Traces),
+                       len(resp.SchemaInfo.IndexRules))
+       })
+
+       It("should return data info", func() {
+               By("Inspecting group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+
+               By("Verifying data collected from multiple nodes")
+               Expect(len(resp.DataInfo)).Should(BeNumerically(">=", 1), 
"should collect from at least one data node")
+
+               var totalDataSize int64
+               for idx, dataInfo := range resp.DataInfo {
+                       Expect(dataInfo.Node).NotTo(BeNil(), "data node %d 
should have node info", idx)
+                       
Expect(dataInfo.DataSizeBytes).Should(BeNumerically(">", 0), "data node %d (%s) 
should have DataSizeBytes > 0", idx, dataInfo.Node.Metadata.Name)
+                       totalDataSize += dataInfo.DataSizeBytes
+                       GinkgoWriter.Printf("Data node %d: %s, segments: %d, 
size: %d bytes\n",
+                               idx, dataInfo.Node.Metadata.Name, 
len(dataInfo.SegmentInfo), dataInfo.DataSizeBytes)
+               }
+               Expect(totalDataSize).Should(BeNumerically(">", 0), "total data 
size should be > 0")
+               GinkgoWriter.Printf("Total data size across all nodes: %d 
bytes\n", totalDataSize)
+       })
+
+       It("should return liaison info", func() {
+               By("Inspecting group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+
+               By("Verifying liaison info collected")
+               Expect(len(resp.LiaisonInfo)).Should(BeNumerically(">=", 1), 
"should collect from at least one liaison node")
+
+               for idx, liaisonInfo := range resp.LiaisonInfo {
+                       GinkgoWriter.Printf("Liaison node %d: PendingWrite=%d, 
PendingSync=%d parts (%d bytes)\n",
+                               idx,
+                               liaisonInfo.PendingWriteDataCount,
+                               liaisonInfo.PendingSyncPartCount,
+                               liaisonInfo.PendingSyncDataSizeBytes)
+               }
+       })
+})
+
+var _ = Describe("Inspect stream in distributed mode", func() {
+       var groupName string
+       var streamName string
+       var ctx context.Context
+       const dataCount = 100
+
+       BeforeEach(func() {
+               ctx = context.TODO()
+               groupName = fmt.Sprintf("inspect-stream-test-%d", 
time.Now().UnixNano())
+               streamName = "test_stream"
+
+               By("Creating stream group")
+               _, createErr := groupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{
+                       Group: &commonv1.Group{
+                               Metadata: &commonv1.Metadata{
+                                       Name: groupName,
+                               },
+                               Catalog: commonv1.Catalog_CATALOG_STREAM,
+                               ResourceOpts: &commonv1.ResourceOpts{
+                                       ShardNum: 2,
+                                       SegmentInterval: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  1,
+                                       },
+                                       Ttl: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  7,
+                                       },
+                               },
+                       },
+               })
+               Expect(createErr).ShouldNot(HaveOccurred())
+
+               By("Creating stream schema")
+               _, streamErr := streamRegClient.Create(ctx, 
&databasev1.StreamRegistryServiceCreateRequest{
+                       Stream: &databasev1.Stream{
+                               Metadata: &commonv1.Metadata{
+                                       Name:  streamName,
+                                       Group: groupName,
+                               },
+                               Entity: &databasev1.Entity{
+                                       TagNames: []string{"svc"},
+                               },
+                               TagFamilies: []*databasev1.TagFamilySpec{{
+                                       Name: "default",
+                                       Tags: []*databasev1.TagSpec{{
+                                               Name: "svc",
+                                               Type: 
databasev1.TagType_TAG_TYPE_STRING,
+                                       }},
+                               }},
+                       },
+               })
+               Expect(streamErr).ShouldNot(HaveOccurred())
+               time.Sleep(2 * time.Second)
+
+               By("Writing stream data")
+               writeStreamData(ctx, groupName, streamName, dataCount)
+               time.Sleep(5 * time.Second)
+       })
+
+       AfterEach(func() {
+               _, _ = groupClient.Delete(ctx, 
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+       })
+
+       It("should return schema info", func() {
+               By("Inspecting stream group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+
+               By("Verifying schema info contains the stream")
+               Expect(resp.SchemaInfo).NotTo(BeNil())
+               Expect(len(resp.SchemaInfo.Streams)).Should(BeNumerically(">=", 
1))
+               Expect(resp.SchemaInfo.Streams).To(ContainElement(streamName))
+               GinkgoWriter.Printf("Stream schema info: streams=%d, 
indexRules=%d\n",
+                       len(resp.SchemaInfo.Streams),
+                       len(resp.SchemaInfo.IndexRules))
+       })
+
+       It("should return data info", func() {
+               By("Inspecting stream group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+
+               By("Verifying stream data collected from multiple nodes")
+               Expect(len(resp.DataInfo)).Should(BeNumerically(">=", 1), 
"should collect from at least one stream data node")
+
+               var totalDataSize int64
+               for idx, dataInfo := range resp.DataInfo {
+                       Expect(dataInfo.Node).NotTo(BeNil(), "stream data node 
%d should have node info", idx)
+                       
Expect(dataInfo.DataSizeBytes).Should(BeNumerically(">", 0), "stream data node 
%d (%s) should have DataSizeBytes > 0", idx, dataInfo.Node.Metadata.Name)
+                       totalDataSize += dataInfo.DataSizeBytes
+                       GinkgoWriter.Printf("Stream data node %d: %s, segments: 
%d, size: %d bytes\n",
+                               idx, dataInfo.Node.Metadata.Name, 
len(dataInfo.SegmentInfo), dataInfo.DataSizeBytes)
+               }
+               Expect(totalDataSize).Should(BeNumerically(">", 0), "total 
stream data size should be > 0")
+               GinkgoWriter.Printf("Total stream data size: %d bytes\n", 
totalDataSize)
+       })
+
+       It("should return liaison info", func() {
+               By("Inspecting stream group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+
+               By("Verifying stream liaison info collected")
+               Expect(len(resp.LiaisonInfo)).Should(BeNumerically(">=", 1), 
"should collect from at least one liaison node")
+
+               for idx, liaisonInfo := range resp.LiaisonInfo {
+                       logger.Infof("Inspecting stream liaison node %d: 
PendingWrite=%d, PendingSync=%d parts",
+                               idx, liaisonInfo.PendingWriteDataCount, 
liaisonInfo.PendingSyncPartCount)
+                       GinkgoWriter.Printf("Stream liaison node %d: 
PendingWrite=%d, PendingSync=%d parts\n",
+                               idx, liaisonInfo.PendingWriteDataCount, 
liaisonInfo.PendingSyncPartCount)
+               }
+       })
+})
+
+var _ = Describe("Inspect trace in distributed mode", func() {
+       var groupName string
+       var traceName string
+       var ctx context.Context
+       const dataCount = 100
+
+       BeforeEach(func() {
+               ctx = context.TODO()
+               groupName = fmt.Sprintf("inspect-trace-test-%d", 
time.Now().UnixNano())
+               traceName = "test_trace"
+
+               By("Creating trace group")
+               _, createErr := groupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{
+                       Group: &commonv1.Group{
+                               Metadata: &commonv1.Metadata{
+                                       Name: groupName,
+                               },
+                               Catalog: commonv1.Catalog_CATALOG_TRACE,
+                               ResourceOpts: &commonv1.ResourceOpts{
+                                       ShardNum: 2,
+                                       SegmentInterval: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  1,
+                                       },
+                                       Ttl: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  7,
+                                       },
+                               },
+                       },
+               })
+               Expect(createErr).ShouldNot(HaveOccurred())
+
+               By("Creating trace schema")
+               _, traceErr := traceRegClient.Create(ctx, 
&databasev1.TraceRegistryServiceCreateRequest{
+                       Trace: &databasev1.Trace{
+                               Metadata: &commonv1.Metadata{
+                                       Name:  traceName,
+                                       Group: groupName,
+                               },
+                               TraceIdTagName:   "trace_id",
+                               SpanIdTagName:    "span_id",
+                               TimestampTagName: "timestamp",
+                               Tags: []*databasev1.TraceTagSpec{
+                                       {Name: "trace_id", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                                       {Name: "span_id", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                                       {Name: "timestamp", Type: 
databasev1.TagType_TAG_TYPE_TIMESTAMP},
+                                       {Name: "service_id", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                                       {Name: "duration", Type: 
databasev1.TagType_TAG_TYPE_INT},
+                               },
+                       },
+               })
+               Expect(traceErr).ShouldNot(HaveOccurred())
+               time.Sleep(2 * time.Second)
+
+               By("Writing trace data")
+               writeTraceData(ctx, groupName, traceName, dataCount)
+               time.Sleep(5 * time.Second)
+       })
+
+       AfterEach(func() {
+               _, _ = groupClient.Delete(ctx, 
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+       })
+
+       It("should return schema info", func() {
+               By("Inspecting trace group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+
+               By("Verifying schema info contains the trace")
+               Expect(resp.SchemaInfo).NotTo(BeNil())
+               Expect(len(resp.SchemaInfo.Traces)).Should(BeNumerically(">=", 
1))
+               Expect(resp.SchemaInfo.Traces).To(ContainElement(traceName))
+               GinkgoWriter.Printf("Trace schema info: traces=%d, 
indexRules=%d\n",
+                       len(resp.SchemaInfo.Traces),
+                       len(resp.SchemaInfo.IndexRules))
+       })
+
+       It("should return data info", func() {
+               By("Inspecting trace group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+
+               By("Verifying trace data collected from multiple nodes")
+               Expect(len(resp.DataInfo)).Should(BeNumerically(">=", 1), 
"should collect from at least one trace data node")
+
+               var totalDataSize int64
+               for idx, dataInfo := range resp.DataInfo {
+                       Expect(dataInfo.Node).NotTo(BeNil(), "trace data node 
%d should have node info", idx)
+                       
Expect(dataInfo.DataSizeBytes).Should(BeNumerically(">", 0), "trace data node 
%d (%s) should have DataSizeBytes > 0", idx, dataInfo.Node.Metadata.Name)
+                       totalDataSize += dataInfo.DataSizeBytes
+                       GinkgoWriter.Printf("Trace data node %d: %s, segments: 
%d, size: %d bytes\n",
+                               idx, dataInfo.Node.Metadata.Name, 
len(dataInfo.SegmentInfo), dataInfo.DataSizeBytes)
+               }
+               Expect(totalDataSize).Should(BeNumerically(">", 0), "total 
trace data size should be > 0")
+               GinkgoWriter.Printf("Total trace data size: %d bytes\n", 
totalDataSize)
+       })
+
+       It("should return liaison info", func() {
+               By("Inspecting trace group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+
+               By("Verifying trace liaison info collected")
+               Expect(len(resp.LiaisonInfo)).Should(BeNumerically(">=", 1), 
"should collect from at least one liaison node")
+
+               for idx, liaisonInfo := range resp.LiaisonInfo {
+                       GinkgoWriter.Printf("Trace liaison node %d: 
PendingWrite=%d, PendingSync=%d parts\n",
+                               idx, liaisonInfo.PendingWriteDataCount, 
liaisonInfo.PendingSyncPartCount)
+               }
+       })
+})
diff --git a/test/integration/standalone/inspect/inspect_suite_test.go 
b/test/integration/standalone/inspect/inspect_suite_test.go
new file mode 100644
index 00000000..ef624ab5
--- /dev/null
+++ b/test/integration/standalone/inspect/inspect_suite_test.go
@@ -0,0 +1,558 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package integration_inspect_test provides integration tests for the inspect 
functionality in standalone mode.
+package integration_inspect_test
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "strconv"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
+)
+
+func TestInspect(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Standalone Inspect Suite", 
Label(integration_standalone.Labels...))
+}
+
+var (
+       deferFunc          func()
+       goods              []gleak.Goroutine
+       connection         *grpc.ClientConn
+       groupClient        databasev1.GroupRegistryServiceClient
+       measureRegClient   databasev1.MeasureRegistryServiceClient
+       streamRegClient    databasev1.StreamRegistryServiceClient
+       traceRegClient     databasev1.TraceRegistryServiceClient
+       measureWriteClient measurev1.MeasureServiceClient
+       streamWriteClient  streamv1.StreamServiceClient
+       traceWriteClient   tracev1.TraceServiceClient
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+       pool.EnableStackTracking(true)
+       goods = gleak.Goroutines()
+       By("Starting standalone server")
+       addr, _, closeFn := setup.EmptyStandalone()
+       deferFunc = closeFn
+       return []byte(addr)
+}, func(address []byte) {
+       var err error
+       connection, err = grpchelper.Conn(string(address), 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       Expect(err).NotTo(HaveOccurred())
+       groupClient = databasev1.NewGroupRegistryServiceClient(connection)
+       measureRegClient = 
databasev1.NewMeasureRegistryServiceClient(connection)
+       streamRegClient = databasev1.NewStreamRegistryServiceClient(connection)
+       traceRegClient = databasev1.NewTraceRegistryServiceClient(connection)
+       measureWriteClient = measurev1.NewMeasureServiceClient(connection)
+       streamWriteClient = streamv1.NewStreamServiceClient(connection)
+       traceWriteClient = tracev1.NewTraceServiceClient(connection)
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {
+       if deferFunc != nil {
+               deferFunc()
+       }
+})
+
+var _ = ReportAfterSuite("Standalone Inspect Suite", func(report Report) {
+       if report.SuiteSucceeded {
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+               Eventually(pool.AllRefsCount, 
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+       }
+})
+
+func writeMeasureData(ctx context.Context, groupName, measureName string, 
dataCount int) {
+       writeClient, writeErr := measureWriteClient.Write(ctx)
+       Expect(writeErr).ShouldNot(HaveOccurred())
+
+       metadata := &commonv1.Metadata{
+               Name:  measureName,
+               Group: groupName,
+       }
+       baseTime := time.Now().Truncate(time.Millisecond)
+       for idx := 0; idx < dataCount; idx++ {
+               req := &measurev1.WriteRequest{
+                       Metadata: metadata,
+                       DataPoint: &measurev1.DataPointValue{
+                               Timestamp: 
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second)),
+                               TagFamilies: []*modelv1.TagFamilyForWrite{{
+                                       Tags: []*modelv1.TagValue{{
+                                               Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "id_" + strconv.Itoa(idx)}},
+                                       }},
+                               }},
+                               Fields: []*modelv1.FieldValue{{
+                                       Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: int64(idx * 100)}},
+                               }},
+                       },
+                       MessageId: uint64(time.Now().UnixNano()),
+               }
+               sendErr := writeClient.Send(req)
+               Expect(sendErr).ShouldNot(HaveOccurred())
+       }
+       Expect(writeClient.CloseSend()).To(Succeed())
+       Eventually(func() error {
+               _, recvErr := writeClient.Recv()
+               return recvErr
+       }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+}
+
+func writeStreamData(ctx context.Context, groupName, streamName string, 
dataCount int) {
+       writeClient, writeErr := streamWriteClient.Write(ctx)
+       Expect(writeErr).ShouldNot(HaveOccurred())
+
+       metadata := &commonv1.Metadata{
+               Name:  streamName,
+               Group: groupName,
+       }
+       baseTime := time.Now().Truncate(time.Millisecond)
+       for idx := 0; idx < dataCount; idx++ {
+               req := &streamv1.WriteRequest{
+                       Metadata: metadata,
+                       Element: &streamv1.ElementValue{
+                               ElementId: strconv.Itoa(idx),
+                               Timestamp: 
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second)),
+                               TagFamilies: []*modelv1.TagFamilyForWrite{{
+                                       Tags: []*modelv1.TagValue{{
+                                               Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_" + strconv.Itoa(idx)}},
+                                       }},
+                               }},
+                       },
+                       MessageId: uint64(time.Now().UnixNano()),
+               }
+               sendErr := writeClient.Send(req)
+               Expect(sendErr).ShouldNot(HaveOccurred())
+       }
+       Expect(writeClient.CloseSend()).To(Succeed())
+       Eventually(func() error {
+               _, recvErr := writeClient.Recv()
+               return recvErr
+       }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+}
+
+func writeTraceData(ctx context.Context, groupName, traceName string, 
dataCount int) {
+       writeClient, writeErr := traceWriteClient.Write(ctx)
+       Expect(writeErr).ShouldNot(HaveOccurred())
+
+       metadata := &commonv1.Metadata{
+               Name:  traceName,
+               Group: groupName,
+       }
+       baseTime := time.Now().Truncate(time.Millisecond)
+       for idx := 0; idx < dataCount; idx++ {
+               traceID := fmt.Sprintf("trace_%d", idx)
+               spanID := fmt.Sprintf("span_%d", idx)
+               req := &tracev1.WriteRequest{
+                       Metadata: metadata,
+                       Tags: []*modelv1.TagValue{
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: traceID}}},
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: spanID}}},
+                               {Value: &modelv1.TagValue_Timestamp{Timestamp: 
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second))}},
+                               {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "test_service"}}},
+                               {Value: &modelv1.TagValue_Int{Int: 
&modelv1.Int{Value: int64(idx * 10)}}},
+                       },
+                       Span:    []byte(fmt.Sprintf("span_data_%d", idx)),
+                       Version: uint64(idx + 1),
+               }
+               sendErr := writeClient.Send(req)
+               Expect(sendErr).ShouldNot(HaveOccurred())
+       }
+       Expect(writeClient.CloseSend()).To(Succeed())
+       Eventually(func() error {
+               _, recvErr := writeClient.Recv()
+               return recvErr
+       }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+}
+
+var _ = Describe("Inspect measure in standalone mode", func() {
+       var groupName string
+       var measureName string
+       var ctx context.Context
+       const dataCount = 10
+
+       BeforeEach(func() {
+               ctx = context.TODO()
+               groupName = fmt.Sprintf("inspect-measure-test-%d", 
time.Now().UnixNano())
+               measureName = "test_measure"
+
+               By("Creating measure group")
+               _, createErr := groupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{
+                       Group: &commonv1.Group{
+                               Metadata: &commonv1.Metadata{
+                                       Name: groupName,
+                               },
+                               Catalog: commonv1.Catalog_CATALOG_MEASURE,
+                               ResourceOpts: &commonv1.ResourceOpts{
+                                       ShardNum: 2,
+                                       SegmentInterval: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  1,
+                                       },
+                                       Ttl: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  7,
+                                       },
+                               },
+                       },
+               })
+               Expect(createErr).ShouldNot(HaveOccurred())
+
+               By("Creating measure schema")
+               _, measureErr := measureRegClient.Create(ctx, 
&databasev1.MeasureRegistryServiceCreateRequest{
+                       Measure: &databasev1.Measure{
+                               Metadata: &commonv1.Metadata{
+                                       Name:  measureName,
+                                       Group: groupName,
+                               },
+                               Entity: &databasev1.Entity{
+                                       TagNames: []string{"id"},
+                               },
+                               TagFamilies: []*databasev1.TagFamilySpec{{
+                                       Name: "default",
+                                       Tags: []*databasev1.TagSpec{{
+                                               Name: "id",
+                                               Type: 
databasev1.TagType_TAG_TYPE_STRING,
+                                       }},
+                               }},
+                               Fields: []*databasev1.FieldSpec{{
+                                       Name:              "value",
+                                       FieldType:         
databasev1.FieldType_FIELD_TYPE_INT,
+                                       EncodingMethod:    
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+                                       CompressionMethod: 
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+                               }},
+                       },
+               })
+               Expect(measureErr).ShouldNot(HaveOccurred())
+               time.Sleep(2 * time.Second)
+
+               By("Writing measure data")
+               writeMeasureData(ctx, groupName, measureName, dataCount)
+               time.Sleep(2 * time.Second)
+       })
+
+       AfterEach(func() {
+               _, _ = groupClient.Delete(ctx, 
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+       })
+
+       It("should return group info", func() {
+               By("Inspecting group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+               By("Verifying group info")
+               Expect(resp.Group).NotTo(BeNil())
+               Expect(resp.Group.Metadata.Name).To(Equal(groupName))
+               
Expect(resp.Group.Catalog).To(Equal(commonv1.Catalog_CATALOG_MEASURE))
+       })
+
+       It("should return schema info", func() {
+               By("Inspecting group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+               By("Verifying schema info contains the measure")
+               Expect(resp.SchemaInfo).NotTo(BeNil())
+               
Expect(len(resp.SchemaInfo.Measures)).Should(BeNumerically(">=", 1))
+               Expect(resp.SchemaInfo.Measures).To(ContainElement(measureName))
+               GinkgoWriter.Printf("Schema info: measures=%d, streams=%d, 
traces=%d, indexRules=%d\n",
+                       len(resp.SchemaInfo.Measures),
+                       len(resp.SchemaInfo.Streams),
+                       len(resp.SchemaInfo.Traces),
+                       len(resp.SchemaInfo.IndexRules))
+       })
+
+       It("should return data info", func() {
+               By("Inspecting group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+               By("Verifying data info collected")
+               Expect(len(resp.DataInfo)).Should(BeNumerically(">=", 1), 
"should collect from standalone node")
+               var totalDataSize int64
+               for idx, dataInfo := range resp.DataInfo {
+                       Expect(dataInfo.Node).NotTo(BeNil(), "data node %d 
should have node info", idx)
+                       
Expect(dataInfo.DataSizeBytes).Should(BeNumerically(">", 0), "data node %d 
should have data size > 0", idx)
+                       totalDataSize += dataInfo.DataSizeBytes
+                       GinkgoWriter.Printf("Data node %d: %s, segments: %d, 
size: %d bytes\n",
+                               idx, dataInfo.Node.Metadata.Name, 
len(dataInfo.SegmentInfo), dataInfo.DataSizeBytes)
+               }
+               Expect(totalDataSize).Should(BeNumerically(">", 0), "total data 
size should be > 0")
+               GinkgoWriter.Printf("Total data size: %d bytes\n", 
totalDataSize)
+       })
+
+       It("should return liaison info", func() {
+               By("Inspecting group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+               By("Verifying liaison info in standalone mode")
+               Expect(len(resp.LiaisonInfo)).Should(BeNumerically(">=", 1), 
"should have liaison info in standalone mode")
+               for idx, liaisonInfo := range resp.LiaisonInfo {
+                       GinkgoWriter.Printf("Liaison node %d: 
PendingWrite=%d\n", idx, liaisonInfo.PendingWriteDataCount)
+               }
+       })
+})
+
+var _ = Describe("Inspect stream in standalone mode", func() {
+       var groupName string
+       var streamName string
+       var ctx context.Context
+       const dataCount = 10
+
+       BeforeEach(func() {
+               ctx = context.TODO()
+               groupName = fmt.Sprintf("inspect-stream-test-%d", 
time.Now().UnixNano())
+               streamName = "test_stream"
+
+               By("Creating stream group")
+               _, createErr := groupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{
+                       Group: &commonv1.Group{
+                               Metadata: &commonv1.Metadata{
+                                       Name: groupName,
+                               },
+                               Catalog: commonv1.Catalog_CATALOG_STREAM,
+                               ResourceOpts: &commonv1.ResourceOpts{
+                                       ShardNum: 2,
+                                       SegmentInterval: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  1,
+                                       },
+                                       Ttl: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  7,
+                                       },
+                               },
+                       },
+               })
+               Expect(createErr).ShouldNot(HaveOccurred())
+
+               By("Creating stream schema")
+               _, streamErr := streamRegClient.Create(ctx, 
&databasev1.StreamRegistryServiceCreateRequest{
+                       Stream: &databasev1.Stream{
+                               Metadata: &commonv1.Metadata{
+                                       Name:  streamName,
+                                       Group: groupName,
+                               },
+                               Entity: &databasev1.Entity{
+                                       TagNames: []string{"svc"},
+                               },
+                               TagFamilies: []*databasev1.TagFamilySpec{{
+                                       Name: "default",
+                                       Tags: []*databasev1.TagSpec{{
+                                               Name: "svc",
+                                               Type: 
databasev1.TagType_TAG_TYPE_STRING,
+                                       }},
+                               }},
+                       },
+               })
+               Expect(streamErr).ShouldNot(HaveOccurred())
+               time.Sleep(2 * time.Second)
+
+               By("Writing stream data")
+               writeStreamData(ctx, groupName, streamName, dataCount)
+               time.Sleep(2 * time.Second)
+       })
+
+       AfterEach(func() {
+               _, _ = groupClient.Delete(ctx, 
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+       })
+
+       It("should return group info", func() {
+               By("Inspecting stream group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+               By("Verifying group info")
+               Expect(resp.Group).NotTo(BeNil())
+               Expect(resp.Group.Metadata.Name).To(Equal(groupName))
+               
Expect(resp.Group.Catalog).To(Equal(commonv1.Catalog_CATALOG_STREAM))
+       })
+
+       It("should return data info", func() {
+               By("Inspecting stream group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+               By("Verifying stream data info collected")
+               Expect(len(resp.DataInfo)).Should(BeNumerically(">=", 1), 
"should collect from standalone node")
+               var totalDataSize int64
+               for idx, dataInfo := range resp.DataInfo {
+                       Expect(dataInfo.Node).NotTo(BeNil(), "stream data node 
%d should have node info", idx)
+                       
Expect(dataInfo.DataSizeBytes).Should(BeNumerically(">", 0), "stream data node 
%d should have data size > 0", idx)
+                       totalDataSize += dataInfo.DataSizeBytes
+                       GinkgoWriter.Printf("Stream data node %d: %s, segments: 
%d, size: %d bytes\n",
+                               idx, dataInfo.Node.Metadata.Name, 
len(dataInfo.SegmentInfo), dataInfo.DataSizeBytes)
+               }
+               Expect(totalDataSize).Should(BeNumerically(">", 0), "total 
stream data size should be > 0")
+               GinkgoWriter.Printf("Total stream data size: %d bytes\n", 
totalDataSize)
+       })
+
+       It("should return schema info", func() {
+               By("Inspecting stream group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+               By("Verifying schema info contains the stream")
+               Expect(resp.SchemaInfo).NotTo(BeNil())
+               Expect(len(resp.SchemaInfo.Streams)).Should(BeNumerically(">=", 
1))
+               Expect(resp.SchemaInfo.Streams).To(ContainElement(streamName))
+               GinkgoWriter.Printf("Stream schema info: streams=%d, 
indexRules=%d\n",
+                       len(resp.SchemaInfo.Streams),
+                       len(resp.SchemaInfo.IndexRules))
+       })
+})
+
+var _ = Describe("Inspect trace in standalone mode", func() {
+       var groupName string
+       var traceName string
+       var ctx context.Context
+       const dataCount = 10
+
+       BeforeEach(func() {
+               ctx = context.TODO()
+               groupName = fmt.Sprintf("inspect-trace-test-%d", 
time.Now().UnixNano())
+               traceName = "test_trace"
+
+               By("Creating trace group")
+               _, createErr := groupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{
+                       Group: &commonv1.Group{
+                               Metadata: &commonv1.Metadata{
+                                       Name: groupName,
+                               },
+                               Catalog: commonv1.Catalog_CATALOG_TRACE,
+                               ResourceOpts: &commonv1.ResourceOpts{
+                                       ShardNum: 2,
+                                       SegmentInterval: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  1,
+                                       },
+                                       Ttl: &commonv1.IntervalRule{
+                                               Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                               Num:  7,
+                                       },
+                               },
+                       },
+               })
+               Expect(createErr).ShouldNot(HaveOccurred())
+
+               By("Creating trace schema")
+               _, traceErr := traceRegClient.Create(ctx, 
&databasev1.TraceRegistryServiceCreateRequest{
+                       Trace: &databasev1.Trace{
+                               Metadata: &commonv1.Metadata{
+                                       Name:  traceName,
+                                       Group: groupName,
+                               },
+                               Tags: []*databasev1.TraceTagSpec{
+                                       {Name: "trace_id", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                                       {Name: "span_id", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                                       {Name: "timestamp", Type: 
databasev1.TagType_TAG_TYPE_TIMESTAMP},
+                                       {Name: "service_id", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                                       {Name: "duration", Type: 
databasev1.TagType_TAG_TYPE_INT},
+                               },
+                               TraceIdTagName:   "trace_id",
+                               SpanIdTagName:    "span_id",
+                               TimestampTagName: "timestamp",
+                       },
+               })
+               Expect(traceErr).ShouldNot(HaveOccurred())
+               time.Sleep(2 * time.Second)
+
+               By("Writing trace data")
+               writeTraceData(ctx, groupName, traceName, dataCount)
+               time.Sleep(2 * time.Second)
+       })
+
+       AfterEach(func() {
+               _, _ = groupClient.Delete(ctx, 
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+       })
+
+       It("should return group info", func() {
+               By("Inspecting trace group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+               By("Verifying group info")
+               Expect(resp.Group).NotTo(BeNil())
+               Expect(resp.Group.Metadata.Name).To(Equal(groupName))
+               
Expect(resp.Group.Catalog).To(Equal(commonv1.Catalog_CATALOG_TRACE))
+       })
+
+       It("should return data info", func() {
+               By("Inspecting trace group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+               By("Verifying trace data info collected")
+               Expect(len(resp.DataInfo)).Should(BeNumerically(">=", 1), 
"should collect from standalone node")
+               var totalDataSize int64
+               for idx, dataInfo := range resp.DataInfo {
+                       Expect(dataInfo.Node).NotTo(BeNil(), "trace data node 
%d should have node info", idx)
+                       
Expect(dataInfo.DataSizeBytes).Should(BeNumerically(">", 0), "trace data node 
%d should have data size > 0", idx)
+                       totalDataSize += dataInfo.DataSizeBytes
+                       GinkgoWriter.Printf("Trace data node %d: %s, segments: 
%d, size: %d bytes\n",
+                               idx, dataInfo.Node.Metadata.Name, 
len(dataInfo.SegmentInfo), dataInfo.DataSizeBytes)
+               }
+               Expect(totalDataSize).Should(BeNumerically(">", 0), "total 
trace data size should be > 0")
+               GinkgoWriter.Printf("Total trace data size: %d bytes\n", 
totalDataSize)
+       })
+
+       It("should return schema info", func() {
+               By("Inspecting trace group")
+               resp, err := groupClient.Inspect(ctx, 
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(resp).NotTo(BeNil())
+               By("Verifying schema info contains the trace")
+               Expect(resp.SchemaInfo).NotTo(BeNil())
+               Expect(len(resp.SchemaInfo.Traces)).Should(BeNumerically(">=", 
1))
+               Expect(resp.SchemaInfo.Traces).To(ContainElement(traceName))
+               GinkgoWriter.Printf("Trace schema info: traces=%d, 
indexRules=%d\n",
+                       len(resp.SchemaInfo.Traces),
+                       len(resp.SchemaInfo.IndexRules))
+       })
+})


Reply via email to