This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new e8453c7d Implement group inspection to display deletion details (#947)
e8453c7d is described below
commit e8453c7d1f65730422f73e7390a7d3ed7e29b5e4
Author: Huang Youliang <[email protected]>
AuthorDate: Sun Feb 1 16:20:19 2026 +0800
Implement group inspection to display deletion details (#947)
* Implement group inspection to display deletion details
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
api/data/data.go | 91 ++-
api/data/measure.go | 6 +
api/data/stream.go | 6 +
api/data/trace.go | 6 +
banyand/internal/storage/index.go | 4 +
banyand/internal/storage/storage.go | 1 +
banyand/internal/wqueue/wqueue.go | 11 +
banyand/liaison/grpc/registry.go | 113 ++++
banyand/measure/introducer.go | 1 +
banyand/measure/metadata.go | 184 +++++++
banyand/measure/metrics.go | 19 +-
banyand/measure/svc_data.go | 36 +-
banyand/measure/svc_liaison.go | 49 ++
banyand/measure/svc_standalone.go | 27 +
banyand/measure/tstable.go | 19 +-
banyand/metadata/client.go | 36 ++
banyand/metadata/metadata.go | 7 +
banyand/metadata/schema/collector.go | 228 ++++++++
banyand/metadata/schema/etcd.go | 5 +-
banyand/metadata/schema/schema.go | 10 +
banyand/stream/introducer.go | 1 +
banyand/stream/metadata.go | 218 +++++++-
banyand/stream/metrics.go | 23 +-
banyand/stream/svc_liaison.go | 54 ++
banyand/stream/svc_standalone.go | 45 +-
banyand/stream/tstable.go | 33 +-
banyand/trace/handoff_controller.go | 15 +
banyand/trace/introducer.go | 1 +
banyand/trace/merger.go | 1 +
banyand/trace/metadata.go | 234 +++++++-
banyand/trace/metrics.go | 20 +-
banyand/trace/svc_liaison.go | 58 ++
banyand/trace/svc_standalone.go | 41 +-
banyand/trace/trace.go | 2 +
banyand/trace/tstable.go | 35 +-
pkg/cmdsetup/liaison.go | 3 +
pkg/index/index.go | 1 +
pkg/index/inverted/inverted.go | 15 +
.../distributed/inspect/inspect_suite_test.go | 613 +++++++++++++++++++++
.../standalone/inspect/inspect_suite_test.go | 558 +++++++++++++++++++
40 files changed, 2754 insertions(+), 76 deletions(-)
diff --git a/api/data/data.go b/api/data/data.go
index 61c97897..9857bac4 100644
--- a/api/data/data.go
+++ b/api/data/data.go
@@ -21,6 +21,7 @@ package data
import (
"google.golang.org/protobuf/proto"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
@@ -31,30 +32,36 @@ import (
var (
// TopicMap is the map of topic name to topic.
TopicMap = map[string]bus.Topic{
- TopicStreamWrite.String(): TopicStreamWrite,
- TopicStreamQuery.String(): TopicStreamQuery,
- TopicMeasureWrite.String(): TopicMeasureWrite,
- TopicMeasureQuery.String(): TopicMeasureQuery,
- TopicInternalMeasureQuery.String():
TopicInternalMeasureQuery,
- TopicTopNQuery.String(): TopicTopNQuery,
- TopicPropertyDelete.String(): TopicPropertyDelete,
- TopicPropertyQuery.String(): TopicPropertyQuery,
- TopicPropertyUpdate.String(): TopicPropertyUpdate,
- TopicStreamPartSync.String(): TopicStreamPartSync,
- TopicMeasurePartSync.String(): TopicMeasurePartSync,
- TopicMeasureSeriesIndexInsert.String():
TopicMeasureSeriesIndexInsert,
- TopicMeasureSeriesIndexUpdate.String():
TopicMeasureSeriesIndexUpdate,
- TopicMeasureSeriesSync.String(): TopicMeasureSeriesSync,
- TopicPropertyRepair.String(): TopicPropertyRepair,
- TopicStreamSeriesIndexWrite.String():
TopicStreamSeriesIndexWrite,
- TopicStreamLocalIndexWrite.String():
TopicStreamLocalIndexWrite,
- TopicStreamSeriesSync.String(): TopicStreamSeriesSync,
- TopicStreamElementIndexSync.String():
TopicStreamElementIndexSync,
- TopicTraceWrite.String(): TopicTraceWrite,
- TopicTraceQuery.String(): TopicTraceQuery,
- TopicTracePartSync.String(): TopicTracePartSync,
- TopicTraceSeriesSync.String(): TopicTraceSeriesSync,
- TopicTraceSidxSeriesWrite.String():
TopicTraceSidxSeriesWrite,
+ TopicStreamWrite.String(): TopicStreamWrite,
+ TopicStreamQuery.String(): TopicStreamQuery,
+ TopicMeasureWrite.String(): TopicMeasureWrite,
+ TopicMeasureQuery.String(): TopicMeasureQuery,
+ TopicInternalMeasureQuery.String():
TopicInternalMeasureQuery,
+ TopicTopNQuery.String(): TopicTopNQuery,
+ TopicPropertyDelete.String(): TopicPropertyDelete,
+ TopicPropertyQuery.String(): TopicPropertyQuery,
+ TopicPropertyUpdate.String(): TopicPropertyUpdate,
+ TopicStreamPartSync.String(): TopicStreamPartSync,
+ TopicMeasurePartSync.String(): TopicMeasurePartSync,
+ TopicMeasureSeriesIndexInsert.String():
TopicMeasureSeriesIndexInsert,
+ TopicMeasureSeriesIndexUpdate.String():
TopicMeasureSeriesIndexUpdate,
+ TopicMeasureSeriesSync.String(): TopicMeasureSeriesSync,
+ TopicPropertyRepair.String(): TopicPropertyRepair,
+ TopicStreamSeriesIndexWrite.String():
TopicStreamSeriesIndexWrite,
+ TopicStreamLocalIndexWrite.String():
TopicStreamLocalIndexWrite,
+ TopicStreamSeriesSync.String(): TopicStreamSeriesSync,
+ TopicStreamElementIndexSync.String():
TopicStreamElementIndexSync,
+ TopicTraceWrite.String(): TopicTraceWrite,
+ TopicTraceQuery.String(): TopicTraceQuery,
+ TopicTracePartSync.String(): TopicTracePartSync,
+ TopicTraceSeriesSync.String(): TopicTraceSeriesSync,
+ TopicTraceSidxSeriesWrite.String():
TopicTraceSidxSeriesWrite,
+ TopicMeasureCollectDataInfo.String():
TopicMeasureCollectDataInfo,
+ TopicMeasureCollectLiaisonInfo.String():
TopicMeasureCollectLiaisonInfo,
+ TopicStreamCollectDataInfo.String():
TopicStreamCollectDataInfo,
+ TopicStreamCollectLiaisonInfo.String():
TopicStreamCollectLiaisonInfo,
+ TopicTraceCollectDataInfo.String():
TopicTraceCollectDataInfo,
+ TopicTraceCollectLiaisonInfo.String():
TopicTraceCollectLiaisonInfo,
}
// TopicRequestMap is the map of topic name to request message.
@@ -132,6 +139,24 @@ var (
TopicTraceSidxSeriesWrite: func() proto.Message {
return nil
},
+ TopicMeasureCollectDataInfo: func() proto.Message {
+ return &databasev1.GroupRegistryServiceInspectRequest{}
+ },
+ TopicMeasureCollectLiaisonInfo: func() proto.Message {
+ return &databasev1.GroupRegistryServiceInspectRequest{}
+ },
+ TopicStreamCollectDataInfo: func() proto.Message {
+ return &databasev1.GroupRegistryServiceInspectRequest{}
+ },
+ TopicStreamCollectLiaisonInfo: func() proto.Message {
+ return &databasev1.GroupRegistryServiceInspectRequest{}
+ },
+ TopicTraceCollectDataInfo: func() proto.Message {
+ return &databasev1.GroupRegistryServiceInspectRequest{}
+ },
+ TopicTraceCollectLiaisonInfo: func() proto.Message {
+ return &databasev1.GroupRegistryServiceInspectRequest{}
+ },
}
// TopicResponseMap is the map of topic name to response message.
@@ -164,6 +189,24 @@ var (
TopicTraceQuery: func() proto.Message {
return &tracev1.InternalQueryResponse{}
},
+ TopicMeasureCollectDataInfo: func() proto.Message {
+ return &databasev1.DataInfo{}
+ },
+ TopicStreamCollectDataInfo: func() proto.Message {
+ return &databasev1.DataInfo{}
+ },
+ TopicTraceCollectDataInfo: func() proto.Message {
+ return &databasev1.DataInfo{}
+ },
+ TopicMeasureCollectLiaisonInfo: func() proto.Message {
+ return &databasev1.LiaisonInfo{}
+ },
+ TopicStreamCollectLiaisonInfo: func() proto.Message {
+ return &databasev1.LiaisonInfo{}
+ },
+ TopicTraceCollectLiaisonInfo: func() proto.Message {
+ return &databasev1.LiaisonInfo{}
+ },
}
// TopicCommon is the common topic for data transmission.
diff --git a/api/data/measure.go b/api/data/measure.go
index 2bf2bc4d..d414abeb 100644
--- a/api/data/measure.go
+++ b/api/data/measure.go
@@ -103,3 +103,9 @@ var MeasureSeriesSyncKindVersion = common.KindVersion{
// TopicMeasureSeriesSync is the measure series sync topic.
var TopicMeasureSeriesSync = bus.BiTopic(MeasureSeriesSyncKindVersion.String())
+
+// TopicMeasureCollectDataInfo is the topic for collecting data info from data
nodes.
+var TopicMeasureCollectDataInfo = bus.BiTopic("measure-collect-data-info")
+
+// TopicMeasureCollectLiaisonInfo is the topic for collecting liaison info
from liaison nodes.
+var TopicMeasureCollectLiaisonInfo =
bus.BiTopic("measure-collect-liaison-info")
diff --git a/api/data/stream.go b/api/data/stream.go
index 43be0dc0..560931a3 100644
--- a/api/data/stream.go
+++ b/api/data/stream.go
@@ -93,3 +93,9 @@ var StreamElementIndexSyncKindVersion = common.KindVersion{
// TopicStreamElementIndexSync is the element index sync topic.
var TopicStreamElementIndexSync =
bus.BiTopic(StreamElementIndexSyncKindVersion.String())
+
+// TopicStreamCollectDataInfo is the topic for collecting data info from data
nodes.
+var TopicStreamCollectDataInfo = bus.BiTopic("stream-collect-data-info")
+
+// TopicStreamCollectLiaisonInfo is the topic for collecting liaison info from
liaison nodes.
+var TopicStreamCollectLiaisonInfo = bus.BiTopic("stream-collect-liaison-info")
diff --git a/api/data/trace.go b/api/data/trace.go
index 3918e3f7..0c7d5579 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -75,3 +75,9 @@ var TraceSidxSeriesWriteKindVersion = common.KindVersion{
// TopicTraceSidxSeriesWrite is the trace sidx series write topic.
var TopicTraceSidxSeriesWrite =
bus.BiTopic(TraceSidxSeriesWriteKindVersion.String())
+
+// TopicTraceCollectDataInfo is the topic for collecting data info from data
nodes.
+var TopicTraceCollectDataInfo = bus.BiTopic("trace-collect-data-info")
+
+// TopicTraceCollectLiaisonInfo is the topic for collecting liaison info from
liaison nodes.
+var TopicTraceCollectLiaisonInfo = bus.BiTopic("trace-collect-liaison-info")
diff --git a/banyand/internal/storage/index.go
b/banyand/internal/storage/index.go
index bc033dc7..ad8afc1c 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -94,6 +94,10 @@ func (s *seriesIndex) EnableExternalSegments()
(index.ExternalSegmentStreamer, e
return s.store.EnableExternalSegments()
}
+func (s *seriesIndex) Stats() (dataCount int64, dataSizeBytes int64) {
+ return s.store.Stats()
+}
+
func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series,
projection []index.FieldKey, secondaryQuery index.Query, timeRange
*timestamp.TimeRange,
) (data SeriesData, err error) {
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index 5ac1b883..41f7280a 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -102,6 +102,7 @@ type IndexDB interface {
Search(ctx context.Context, series []*pbv1.Series, opts
IndexSearchOpts) (SeriesData, [][]byte, error)
SearchWithoutSeries(ctx context.Context, opts IndexSearchOpts) (sd
SeriesData, sortedValues [][]byte, err error)
EnableExternalSegments() (index.ExternalSegmentStreamer, error)
+ Stats() (dataCount int64, dataSizeBytes int64)
}
// TSDB allows listing and getting shard details.
diff --git a/banyand/internal/wqueue/wqueue.go
b/banyand/internal/wqueue/wqueue.go
index 099664ae..357d5a95 100644
--- a/banyand/internal/wqueue/wqueue.go
+++ b/banyand/internal/wqueue/wqueue.go
@@ -220,3 +220,14 @@ func (q *Queue[S, O]) GetTimeRange(ts time.Time)
timestamp.TimeRange {
func (q *Queue[S, O]) GetNodes(shardID common.ShardID) []string {
return q.opts.GetNodes(shardID)
}
+
+// SubQueues returns sub-queues from all shards in the queue.
+func (q *Queue[S, O]) SubQueues() []S {
+ q.RLock()
+ defer q.RUnlock()
+ result := make([]S, 0, len(q.sLst))
+ for _, shard := range q.sLst {
+ result = append(result, shard.sq)
+ }
+ return result
+}
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 671b6948..8c0a2dc4 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -700,6 +700,119 @@ func (rs *groupRegistryServer) Exist(ctx context.Context,
req *databasev1.GroupR
return nil, err
}
+func (rs *groupRegistryServer) Inspect(ctx context.Context, req
*databasev1.GroupRegistryServiceInspectRequest) (
+ *databasev1.GroupRegistryServiceInspectResponse, error,
+) {
+ g := req.GetGroup()
+ rs.metrics.totalRegistryStarted.Inc(1, g, "group", "inspect")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "group", "inspect")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group",
"inspect")
+ }()
+ group, err := rs.schemaRegistry.GroupRegistry().GetGroup(ctx, g)
+ if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
+ return nil, err
+ }
+ schemaInfo, schemaErr := rs.collectSchemaInfo(ctx, g)
+ if schemaErr != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
+ return nil, schemaErr
+ }
+ dataInfo, dataErr := rs.schemaRegistry.CollectDataInfo(ctx, g)
+ if dataErr != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
+ return nil, dataErr
+ }
+ liaisonInfo, liaisonErr := rs.schemaRegistry.CollectLiaisonInfo(ctx, g)
+ if liaisonErr != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "inspect")
+ return nil, liaisonErr
+ }
+ return &databasev1.GroupRegistryServiceInspectResponse{
+ Group: group,
+ SchemaInfo: schemaInfo,
+ DataInfo: dataInfo,
+ LiaisonInfo: liaisonInfo,
+ }, nil
+}
+
+func (rs *groupRegistryServer) Query(_ context.Context, _
*databasev1.GroupRegistryServiceQueryRequest) (
+ *databasev1.GroupRegistryServiceQueryResponse, error,
+) {
+ return nil, status.Error(codes.Unimplemented, "Query method not
implemented yet")
+}
+
+func (rs *groupRegistryServer) collectSchemaInfo(ctx context.Context, group
string) (*databasev1.SchemaInfo, error) {
+ opt := schema.ListOpt{Group: group}
+ streams, streamsErr :=
rs.schemaRegistry.StreamRegistry().ListStream(ctx, opt)
+ if streamsErr != nil {
+ return nil, streamsErr
+ }
+ streamNames := make([]string, 0, len(streams))
+ for _, s := range streams {
+ streamNames = append(streamNames, s.GetMetadata().GetName())
+ }
+ measures, measuresErr :=
rs.schemaRegistry.MeasureRegistry().ListMeasure(ctx, opt)
+ if measuresErr != nil {
+ return nil, measuresErr
+ }
+ measureNames := make([]string, 0, len(measures))
+ for _, m := range measures {
+ measureNames = append(measureNames, m.GetMetadata().GetName())
+ }
+ traces, tracesErr := rs.schemaRegistry.TraceRegistry().ListTrace(ctx,
opt)
+ if tracesErr != nil {
+ return nil, tracesErr
+ }
+ traceNames := make([]string, 0, len(traces))
+ for _, t := range traces {
+ traceNames = append(traceNames, t.GetMetadata().GetName())
+ }
+ properties, propertiesErr :=
rs.schemaRegistry.PropertyRegistry().ListProperty(ctx, opt)
+ if propertiesErr != nil {
+ return nil, propertiesErr
+ }
+ propertyNames := make([]string, 0, len(properties))
+ for _, p := range properties {
+ propertyNames = append(propertyNames, p.GetMetadata().GetName())
+ }
+ indexRules, indexRulesErr :=
rs.schemaRegistry.IndexRuleRegistry().ListIndexRule(ctx, opt)
+ if indexRulesErr != nil {
+ return nil, indexRulesErr
+ }
+ indexRuleNames := make([]string, 0, len(indexRules))
+ for _, ir := range indexRules {
+ indexRuleNames = append(indexRuleNames,
ir.GetMetadata().GetName())
+ }
+ indexRuleBindings, indexRuleBindingsErr :=
rs.schemaRegistry.IndexRuleBindingRegistry().ListIndexRuleBinding(ctx, opt)
+ if indexRuleBindingsErr != nil {
+ return nil, indexRuleBindingsErr
+ }
+ bindingNames := make([]string, 0, len(indexRuleBindings))
+ for _, irb := range indexRuleBindings {
+ bindingNames = append(bindingNames, irb.GetMetadata().GetName())
+ }
+ topNAggs, topNAggsErr :=
rs.schemaRegistry.TopNAggregationRegistry().ListTopNAggregation(ctx, opt)
+ if topNAggsErr != nil {
+ return nil, topNAggsErr
+ }
+ topNNames := make([]string, 0, len(topNAggs))
+ for _, tn := range topNAggs {
+ topNNames = append(topNNames, tn.GetMetadata().GetName())
+ }
+ return &databasev1.SchemaInfo{
+ Streams: streamNames,
+ Measures: measureNames,
+ Traces: traceNames,
+ Properties: propertyNames,
+ IndexRules: indexRuleNames,
+ IndexRuleBindings: bindingNames,
+ TopnAggregations: topNNames,
+ }, nil
+}
+
type topNAggregationRegistryServer struct {
databasev1.UnimplementedTopNAggregationRegistryServiceServer
schemaRegistry metadata.Repo
diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go
index e60d071a..8aed38c3 100644
--- a/banyand/measure/introducer.go
+++ b/banyand/measure/introducer.go
@@ -192,6 +192,7 @@ func (tst *tsTable) introduceMemPart(nextIntroduction
*introduction, epoch uint6
}
next := nextIntroduction.memPart
+ tst.addPendingDataCount(-int64(next.mp.partMetadata.TotalCount))
nextSnp := cur.copyAllTo(epoch)
nextSnp.parts = append(nextSnp.parts, next)
nextSnp.creator = snapshotCreatorMemPart
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index b9da0e7a..3482b096 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -85,6 +85,7 @@ type schemaRepo struct {
nodeID string
path string
closingGroupsMu sync.RWMutex
+ role databasev1.Role
}
func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string,
nodeID string) *schemaRepo {
@@ -95,6 +96,7 @@ func newSchemaRepo(path string, svc *standalone, nodeLabels
map[string]string, n
pipeline: svc.localPipeline,
nodeID: nodeID,
closingGroups: make(map[string]struct{}),
+ role: databasev1.Role_ROLE_DATA,
}
sr.Repository = resourceSchema.NewRepository(
svc.metadata,
@@ -113,6 +115,7 @@ func newLiaisonSchemaRepo(path string, svc *liaison,
measureDataNodeRegistry grp
metadata: svc.metadata,
pipeline: pipeline,
closingGroups: make(map[string]struct{}),
+ role: databasev1.Role_ROLE_LIAISON,
}
sr.Repository = resourceSchema.NewRepository(
svc.metadata,
@@ -370,6 +373,186 @@ func (sr *schemaRepo) loadQueue(groupName string)
(*wqueue.Queue[*tsTable, optio
return db.(*wqueue.Queue[*tsTable, option]), nil
}
+// CollectDataInfo collects data info for a specific group.
+func (sr *schemaRepo) CollectDataInfo(ctx context.Context, group string)
(*databasev1.DataInfo, error) {
+ if sr.nodeID == "" {
+ return nil, nil
+ }
+ node, nodeErr := sr.metadata.NodeRegistry().GetNode(ctx, sr.nodeID)
+ if nodeErr != nil {
+ return nil, nodeErr
+ }
+ tsdb, tsdbErr := sr.loadTSDB(group)
+ if tsdbErr != nil {
+ return nil, tsdbErr
+ }
+ if tsdb == nil {
+ return nil, nil
+ }
+ segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+ Start: time.Unix(0, 0),
+ End: time.Unix(0, timestamp.MaxNanoTime),
+ })
+ if segmentsErr != nil {
+ return nil, segmentsErr
+ }
+ var segmentInfoList []*databasev1.SegmentInfo
+ var totalDataSize int64
+ for _, segment := range segments {
+ timeRange := segment.GetTimeRange()
+ tables, _ := segment.Tables()
+ var shardInfoList []*databasev1.ShardInfo
+ for shardIdx, table := range tables {
+ shardInfo := sr.collectShardInfo(table,
uint32(shardIdx))
+ shardInfoList = append(shardInfoList, shardInfo)
+ totalDataSize += shardInfo.DataSizeBytes
+ }
+ seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
+ totalDataSize += seriesIndexInfo.DataSizeBytes
+ segmentInfo := &databasev1.SegmentInfo{
+ SegmentId: fmt.Sprintf("%d-%d",
timeRange.Start.UnixNano(), timeRange.End.UnixNano()),
+ TimeRangeStart:
timeRange.Start.Format(time.RFC3339Nano),
+ TimeRangeEnd: timeRange.End.Format(time.RFC3339Nano),
+ ShardInfo: shardInfoList,
+ SeriesIndexInfo: seriesIndexInfo,
+ }
+ segmentInfoList = append(segmentInfoList, segmentInfo)
+ segment.DecRef()
+ }
+ dataInfo := &databasev1.DataInfo{
+ Node: node,
+ SegmentInfo: segmentInfoList,
+ DataSizeBytes: totalDataSize,
+ }
+ return dataInfo, nil
+}
+
+func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable,
option]) *databasev1.SeriesIndexInfo {
+ indexDB := segment.IndexDB()
+ if indexDB == nil {
+ return &databasev1.SeriesIndexInfo{}
+ }
+ dataCount, dataSizeBytes := indexDB.Stats()
+ return &databasev1.SeriesIndexInfo{
+ DataCount: dataCount,
+ DataSizeBytes: dataSizeBytes,
+ }
+}
+
+func (sr *schemaRepo) collectShardInfo(table any, shardID uint32)
*databasev1.ShardInfo {
+ tst, ok := table.(*tsTable)
+ if !ok {
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ }
+ }
+ snapshot := tst.currentSnapshot()
+ if snapshot == nil {
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ }
+ }
+ defer snapshot.decRef()
+ var totalCount, compressedSize, partCount uint64
+ for _, pw := range snapshot.parts {
+ if pw.p != nil {
+ totalCount += pw.p.partMetadata.TotalCount
+ compressedSize += pw.p.partMetadata.CompressedSizeBytes
+ partCount++
+ } else if pw.mp != nil {
+ totalCount += pw.mp.partMetadata.TotalCount
+ compressedSize += pw.mp.partMetadata.CompressedSizeBytes
+ partCount++
+ }
+ }
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ DataCount: int64(totalCount),
+ DataSizeBytes: int64(compressedSize),
+ PartCount: int64(partCount),
+ InvertedIndexInfo: &databasev1.InvertedIndexInfo{},
+ SidxInfo: &databasev1.SIDXInfo{},
+ }
+}
+
+func (sr *schemaRepo) collectPendingWriteInfo(groupName string) (int64, error)
{
+ if sr == nil || sr.Repository == nil {
+ return 0, fmt.Errorf("schema repository is not initialized")
+ }
+ if sr.role == databasev1.Role_ROLE_LIAISON {
+ queue, queueErr := sr.loadQueue(groupName)
+ if queueErr != nil {
+ return 0, fmt.Errorf("failed to load queue: %w",
queueErr)
+ }
+ if queue == nil {
+ return 0, nil
+ }
+ var pendingWriteCount int64
+ for _, sq := range queue.SubQueues() {
+ if sq != nil {
+ pendingWriteCount += sq.getPendingDataCount()
+ }
+ }
+ return pendingWriteCount, nil
+ }
+ // Standalone mode
+ tsdb, tsdbErr := sr.loadTSDB(groupName)
+ if tsdbErr != nil {
+ return 0, fmt.Errorf("failed to load TSDB: %w", tsdbErr)
+ }
+ if tsdb == nil {
+ return 0, fmt.Errorf("TSDB is nil for group %s", groupName)
+ }
+ segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+ Start: time.Unix(0, 0),
+ End: time.Unix(0, timestamp.MaxNanoTime),
+ })
+ if segmentsErr != nil {
+ return 0, fmt.Errorf("failed to select segments: %w",
segmentsErr)
+ }
+ var pendingWriteCount int64
+ for _, segment := range segments {
+ tables, _ := segment.Tables()
+ for _, tst := range tables {
+ pendingWriteCount += tst.getPendingDataCount()
+ }
+ segment.DecRef()
+ }
+ return pendingWriteCount, nil
+}
+
+func (sr *schemaRepo) collectPendingSyncInfo(groupName string) (partCount
int64, totalSizeBytes int64, err error) {
+ if sr == nil || sr.Repository == nil {
+ return 0, 0, fmt.Errorf("schema repository is not initialized")
+ }
+ // Only liaison nodes collect pending sync info
+ if sr.role != databasev1.Role_ROLE_LIAISON {
+ return 0, 0, nil
+ }
+ queue, queueErr := sr.loadQueue(groupName)
+ if queueErr != nil {
+ return 0, 0, fmt.Errorf("failed to load queue: %w", queueErr)
+ }
+ if queue == nil {
+ return 0, 0, nil
+ }
+ for _, sq := range queue.SubQueues() {
+ if sq != nil {
+ snapshot := sq.currentSnapshot()
+ if snapshot != nil {
+ for _, pw := range snapshot.parts {
+ if pw.mp == nil && pw.p != nil &&
pw.p.partMetadata.TotalCount > 0 {
+ partCount++
+ totalSizeBytes +=
int64(pw.p.partMetadata.CompressedSizeBytes)
+ }
+ }
+ snapshot.decRef()
+ }
+ }
+ }
+ return partCount, totalSizeBytes, nil
+}
+
func (sr *schemaRepo) createTopNResultMeasure(ctx context.Context,
measureSchemaRegistry schema.Measure, group string) {
md := GetTopNSchemaMetadata(group)
operation := func() error {
@@ -675,6 +858,7 @@ func (s *queueSupplier) newMetrics(p common.Position)
storage.Metrics {
totalFileBlocks:
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
totalFilePartBytes:
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
totalFilePartUncompressedBytes:
factory.NewGauge("total_file_part_uncompressed_bytes",
common.ShardLabelNames()...),
+ pendingDataCount:
factory.NewGauge("pending_data_count", common.ShardLabelNames()...),
},
}
}
diff --git a/banyand/measure/metrics.go b/banyand/measure/metrics.go
index 8cf4e57e..243522a9 100644
--- a/banyand/measure/metrics.go
+++ b/banyand/measure/metrics.go
@@ -34,7 +34,6 @@ import (
var measureScope = observability.RootScope.SubScope("measure")
type metrics struct {
- tbMetrics
totalWritten meter.Counter
totalBatch meter.Counter
totalBatchIntroLatency meter.Counter
@@ -67,6 +66,8 @@ type metrics struct {
totalMergedParts meter.Counter
totalMergeLatency meter.Counter
totalMerged meter.Counter
+
+ tbMetrics
}
func (tst *tsTable) incTotalWritten(delta int) {
@@ -251,6 +252,18 @@ func (tst *tsTable) incTotalMerged(delta int, typ string) {
tst.metrics.totalMerged.Inc(float64(delta), typ)
}
+func (tst *tsTable) addPendingDataCount(delta int64) {
+ tst.pendingDataCount.Add(delta)
+ if tst.metrics == nil {
+ return
+ }
+ tst.metrics.tbMetrics.pendingDataCount.Add(float64(delta),
tst.p.ShardLabelValues()...)
+}
+
+func (tst *tsTable) getPendingDataCount() int64 {
+ return tst.pendingDataCount.Load()
+}
+
func (m *metrics) DeleteAll() {
if m == nil {
return
@@ -335,6 +348,7 @@ func (s *supplier) newMetrics(p common.Position)
(storage.Metrics, observability
totalFileBlocks:
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
totalFilePartBytes:
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
totalFilePartUncompressedBytes:
factory.NewGauge("total_file_part_uncompressed_bytes",
common.ShardLabelNames()...),
+ pendingDataCount:
factory.NewGauge("pending_data_count", common.ShardLabelNames()...),
},
}, factory
}
@@ -393,6 +407,7 @@ func (tst *tsTable) deleteMetrics() {
tst.metrics.tbMetrics.totalFileBlocks.Delete(tst.p.ShardLabelValues()...)
tst.metrics.tbMetrics.totalFilePartBytes.Delete(tst.p.ShardLabelValues()...)
tst.metrics.tbMetrics.totalFilePartUncompressedBytes.Delete(tst.p.ShardLabelValues()...)
+
tst.metrics.tbMetrics.pendingDataCount.Delete(tst.p.ShardLabelValues()...)
}
type tbMetrics struct {
@@ -407,6 +422,8 @@ type tbMetrics struct {
totalFileBlocks meter.Gauge
totalFilePartBytes meter.Gauge
totalFilePartUncompressedBytes meter.Gauge
+
+ pendingDataCount meter.Gauge
}
func (s *standalone) createNativeObservabilityGroup(ctx context.Context) error
{
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index fee99941..a447240e 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -252,7 +252,7 @@ func (s *dataSVC) PreRun(ctx context.Context) error {
s.c = storage.NewServiceCacheWithConfig(s.cc)
}
node := val.(common.Node)
- s.schemaRepo = newDataSchemaRepo(s.dataPath, s, node.Labels)
+ s.schemaRepo = newDataSchemaRepo(s.dataPath, s, node.Labels,
node.NodeID)
s.cm = newCacheMetrics(s.omr)
obsservice.MetricsCollector.Register("measure_cache",
s.collectCacheMetrics)
@@ -261,6 +261,11 @@ func (s *dataSVC) PreRun(ctx context.Context) error {
return nil
}
+ collectDataInfoListener := &collectDataInfoListener{s: s}
+ if subscribeErr :=
s.pipeline.Subscribe(data.TopicMeasureCollectDataInfo,
collectDataInfoListener); subscribeErr != nil {
+ return fmt.Errorf("failed to subscribe to collect data info
topic: %w", subscribeErr)
+ }
+
if err := s.createDataNativeObservabilityGroup(ctx); err != nil {
return err
}
@@ -405,11 +410,13 @@ func NewReadonlyDataSVC(metadata metadata.Repo, omr
observability.MetricsRegistr
}, nil
}
-func newDataSchemaRepo(path string, svc *dataSVC, nodeLabels
map[string]string) *schemaRepo {
+func newDataSchemaRepo(path string, svc *dataSVC, nodeLabels
map[string]string, nodeID string) *schemaRepo {
sr := &schemaRepo{
path: path,
l: svc.l,
metadata: svc.metadata,
+ nodeID: nodeID,
+ role: databasev1.Role_ROLE_DATA,
}
sr.Repository = resourceSchema.NewRepository(
svc.metadata,
@@ -533,3 +540,28 @@ func (d *dataDeleteStreamSegmentsListener) Rev(_
context.Context, message bus.Me
deleted := db.DeleteExpiredSegments(req.SegmentSuffixes)
return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), deleted)
}
+
+func (s *dataSVC) CollectDataInfo(ctx context.Context, group string)
(*databasev1.DataInfo, error) {
+ return s.schemaRepo.CollectDataInfo(ctx, group)
+}
+
+func (s *dataSVC) CollectLiaisonInfo(_ context.Context, _ string)
(*databasev1.LiaisonInfo, error) {
+ return nil, errors.New("collect liaison info is not supported on data
node")
+}
+
+type collectDataInfoListener struct {
+ *bus.UnImplementedHealthyListener
+ s *dataSVC
+}
+
+func (l *collectDataInfoListener) Rev(ctx context.Context, message
bus.Message) bus.Message {
+ req, ok :=
message.Data().(*databasev1.GroupRegistryServiceInspectRequest)
+ if !ok {
+ return bus.NewMessage(message.ID(), common.NewError("invalid
data type for collect data info request"))
+ }
+ dataInfo, collectErr := l.s.schemaRepo.CollectDataInfo(ctx, req.Group)
+ if collectErr != nil {
+ return bus.NewMessage(message.ID(), common.NewError("failed to
collect data info: %v", collectErr))
+ }
+ return bus.NewMessage(message.ID(), dataInfo)
+}
diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go
index 453e087e..aa6dbea8 100644
--- a/banyand/measure/svc_liaison.go
+++ b/banyand/measure/svc_liaison.go
@@ -19,6 +19,7 @@ package measure
import (
"context"
+ "fmt"
"path"
"path/filepath"
"strings"
@@ -36,6 +37,7 @@ import (
obsservice
"github.com/apache/skywalking-banyandb/banyand/observability/services"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/node"
@@ -76,6 +78,27 @@ func (s *liaison) GetRemovalSegmentsTimeRange(group string)
*timestamp.TimeRange
return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
}
+func (s *liaison) CollectDataInfo(_ context.Context, _ string)
(*databasev1.DataInfo, error) {
+ return nil, errors.New("collect data info is not supported on liaison
node")
+}
+
+// CollectLiaisonInfo collects liaison node statistics.
+func (s *liaison) CollectLiaisonInfo(_ context.Context, group string)
(*databasev1.LiaisonInfo, error) {
+ info := &databasev1.LiaisonInfo{}
+ pendingWriteCount, writeErr :=
s.schemaRepo.collectPendingWriteInfo(group)
+ if writeErr != nil {
+ return nil, fmt.Errorf("failed to collect pending write info:
%w", writeErr)
+ }
+ info.PendingWriteDataCount = pendingWriteCount
+ pendingSyncPartCount, pendingSyncDataSizeBytes, syncErr :=
s.schemaRepo.collectPendingSyncInfo(group)
+ if syncErr != nil {
+ return nil, fmt.Errorf("failed to collect pending sync info:
%w", syncErr)
+ }
+ info.PendingSyncPartCount = pendingSyncPartCount
+ info.PendingSyncDataSizeBytes = pendingSyncDataSizeBytes
+ return info, nil
+}
+
func (s *liaison) FlagSet() *run.FlagSet {
flagS := run.NewFlagSet("storage")
flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of
measure")
@@ -153,6 +176,15 @@ func (s *liaison) PreRun(ctx context.Context) error {
if err := s.pipeline.Subscribe(data.TopicMeasureWrite, writeListener);
err != nil {
return err
}
+ if metaSvc, ok := s.metadata.(metadata.Service); ok {
+
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_MEASURE, s)
+ }
+
+ collectLiaisonInfoListener := &collectLiaisonInfoListener{s: s}
+ if subscribeErr :=
s.pipeline.Subscribe(data.TopicMeasureCollectLiaisonInfo,
collectLiaisonInfoListener); subscribeErr != nil {
+ return fmt.Errorf("failed to subscribe to collect liaison info
topic: %w", subscribeErr)
+ }
+
return topNResultPipeline.Subscribe(data.TopicMeasureWrite,
writeListener)
}
@@ -179,3 +211,20 @@ func NewLiaison(metadata metadata.Repo, pipeline
queue.Server, omr observability
},
}, nil
}
+
+type collectLiaisonInfoListener struct {
+ *bus.UnImplementedHealthyListener
+ s *liaison
+}
+
+func (l *collectLiaisonInfoListener) Rev(ctx context.Context, message
bus.Message) bus.Message {
+ req, ok :=
message.Data().(*databasev1.GroupRegistryServiceInspectRequest)
+ if !ok {
+ return bus.NewMessage(message.ID(), common.NewError("invalid
data type for collect liaison info request"))
+ }
+ liaisonInfo, collectErr := l.s.CollectLiaisonInfo(ctx, req.Group)
+ if collectErr != nil {
+ return bus.NewMessage(message.ID(), common.NewError("failed to
collect liaison info: %v", collectErr))
+ }
+ return bus.NewMessage(message.ID(), liaisonInfo)
+}
diff --git a/banyand/measure/svc_standalone.go
b/banyand/measure/svc_standalone.go
index 1c444380..06f6a8a6 100644
--- a/banyand/measure/svc_standalone.go
+++ b/banyand/measure/svc_standalone.go
@@ -19,6 +19,7 @@ package measure
import (
"context"
+ "fmt"
"path"
"path/filepath"
"strings"
@@ -55,6 +56,8 @@ type Service interface {
run.Config
run.Service
Query
+ CollectDataInfo(context.Context, string) (*databasev1.DataInfo, error)
+ CollectLiaisonInfo(context.Context, string) (*databasev1.LiaisonInfo,
error)
}
var _ Service = (*standalone)(nil)
@@ -258,6 +261,10 @@ func (s *standalone) PreRun(ctx context.Context) error {
}
node := val.(common.Node)
s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels, node.NodeID)
+ if metaSvc, ok := s.metadata.(metadata.Service); ok {
+ metaSvc.RegisterDataCollector(commonv1.Catalog_CATALOG_MEASURE,
s.schemaRepo)
+
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_MEASURE, s)
+ }
s.cm = newCacheMetrics(s.omr)
obsservice.MetricsCollector.Register("measure_cache",
s.collectCacheMetrics)
@@ -346,3 +353,23 @@ func NewStandalone(metadata metadata.Repo, pipeline
queue.Server, metricPipeline
pm: pm,
}, nil
}
+
+func (s *standalone) CollectDataInfo(ctx context.Context, group string)
(*databasev1.DataInfo, error) {
+ return s.schemaRepo.CollectDataInfo(ctx, group)
+}
+
+func (s *standalone) CollectLiaisonInfo(_ context.Context, group string)
(*databasev1.LiaisonInfo, error) {
+ info := &databasev1.LiaisonInfo{}
+ pendingWriteCount, writeErr :=
s.schemaRepo.collectPendingWriteInfo(group)
+ if writeErr != nil {
+ return nil, fmt.Errorf("failed to collect pending write info:
%w", writeErr)
+ }
+ info.PendingWriteDataCount = pendingWriteCount
+ pendingSyncPartCount, pendingSyncDataSizeBytes, syncErr :=
s.schemaRepo.collectPendingSyncInfo(group)
+ if syncErr != nil {
+ return nil, fmt.Errorf("failed to collect pending sync info:
%w", syncErr)
+ }
+ info.PendingSyncPartCount = pendingSyncPartCount
+ info.PendingSyncDataSizeBytes = pendingSyncDataSizeBytes
+ return info, nil
+}
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 683327e7..d213cc98 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -152,14 +152,15 @@ type tsTable struct {
introductions chan *introduction
snapshot *snapshot
*metrics
- getNodes func() []string
- l *logger.Logger
- p common.Position
- root string
- group string
- gc garbageCleaner
- option option
- curPartID uint64
+ getNodes func() []string
+ l *logger.Logger
+ p common.Position
+ root string
+ group string
+ gc garbageCleaner
+ option option
+ curPartID uint64
+ pendingDataCount atomic.Int64
sync.RWMutex
shardID common.ShardID
}
@@ -316,9 +317,11 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
ind.memPart.p.partMetadata.ID = atomic.AddUint64(&tst.curPartID, 1)
startTime := time.Now()
totalCount := mp.partMetadata.TotalCount
+ tst.addPendingDataCount(int64(totalCount))
select {
case tst.introductions <- ind:
case <-tst.loopCloser.CloseNotify():
+ tst.addPendingDataCount(-int64(totalCount))
return
}
select {
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index ae5a9d30..54666d67 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -37,6 +37,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata/discovery/file"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -78,10 +79,13 @@ func NewClient(toRegisterNode, forceRegisterNode bool)
(Service, error) {
type clientService struct {
schemaRegistry schema.Registry
+ infoCollectorRegistry *schema.InfoCollectorRegistry
dnsDiscovery *dns.Service
fileDiscovery *file.Service
closer *run.Closer
nodeInfo *databasev1.Node
+ dataBroadcaster bus.Broadcaster
+ liaisonBroadcaster bus.Broadcaster
etcdTLSCertFile string
dnsCACertPaths []string
etcdPassword string
@@ -243,6 +247,14 @@ func (s *clientService) PreRun(ctx context.Context) error {
return err
}
+ s.infoCollectorRegistry = schema.NewInfoCollectorRegistry(l,
s.schemaRegistry)
+ if s.dataBroadcaster != nil {
+ s.infoCollectorRegistry.SetDataBroadcaster(s.dataBroadcaster)
+ }
+ if s.liaisonBroadcaster != nil {
+
s.infoCollectorRegistry.SetLiaisonBroadcaster(s.liaisonBroadcaster)
+ }
+
if s.nodeDiscoveryMode == NodeDiscoveryModeDNS {
l.Info().Strs("srv-addresses",
s.dnsSRVAddresses).Msg("Initializing DNS-based node discovery")
@@ -543,6 +555,30 @@ func (s *clientService) Subjects(ctx context.Context,
indexRule *databasev1.Inde
return foundSubjects, subjectErr
}
+func (s *clientService) CollectDataInfo(ctx context.Context, group string)
([]*databasev1.DataInfo, error) {
+ return s.infoCollectorRegistry.CollectDataInfo(ctx, group)
+}
+
+func (s *clientService) CollectLiaisonInfo(ctx context.Context, group string)
([]*databasev1.LiaisonInfo, error) {
+ return s.infoCollectorRegistry.CollectLiaisonInfo(ctx, group)
+}
+
+func (s *clientService) RegisterDataCollector(catalog commonv1.Catalog,
collector schema.DataInfoCollector) {
+ s.infoCollectorRegistry.RegisterDataCollector(catalog, collector)
+}
+
+func (s *clientService) RegisterLiaisonCollector(catalog commonv1.Catalog,
collector schema.LiaisonInfoCollector) {
+ s.infoCollectorRegistry.RegisterLiaisonCollector(catalog, collector)
+}
+
+func (s *clientService) SetDataBroadcaster(broadcaster bus.Broadcaster) {
+ s.dataBroadcaster = broadcaster
+}
+
+func (s *clientService) SetLiaisonBroadcaster(broadcaster bus.Broadcaster) {
+ s.liaisonBroadcaster = broadcaster
+}
+
func contains(s []string, e string) bool {
for _, a := range s {
if a == e {
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 085cdfed..2e21ffbc 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -26,6 +26,7 @@ import (
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -52,6 +53,8 @@ type Repo interface {
RegisterHandler(string, schema.Kind, schema.EventHandler)
NodeRegistry() schema.Node
PropertyRegistry() schema.Property
+ CollectDataInfo(context.Context, string) ([]*databasev1.DataInfo, error)
+ CollectLiaisonInfo(context.Context, string) ([]*databasev1.LiaisonInfo,
error)
}
// Service is the metadata repository.
@@ -62,4 +65,8 @@ type Service interface {
run.Config
SchemaRegistry() schema.Registry
SetMetricsRegistry(omr observability.MetricsRegistry)
+ SetDataBroadcaster(broadcaster bus.Broadcaster)
+ SetLiaisonBroadcaster(broadcaster bus.Broadcaster)
+ RegisterDataCollector(catalog commonv1.Catalog, collector
schema.DataInfoCollector)
+ RegisterLiaisonCollector(catalog commonv1.Catalog, collector
schema.LiaisonInfoCollector)
}
diff --git a/banyand/metadata/schema/collector.go
b/banyand/metadata/schema/collector.go
new file mode 100644
index 00000000..6f348d91
--- /dev/null
+++ b/banyand/metadata/schema/collector.go
@@ -0,0 +1,228 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// GroupGetter provides method to get group metadata.
+type GroupGetter interface {
+ GetGroup(ctx context.Context, group string) (*commonv1.Group, error)
+}
+
+// InfoCollectorRegistry manages data and liaison info collectors.
+type InfoCollectorRegistry struct {
+ groupGetter GroupGetter
+ dataCollectors map[commonv1.Catalog]DataInfoCollector
+ liaisonCollectors map[commonv1.Catalog]LiaisonInfoCollector
+ dataBroadcaster bus.Broadcaster
+ liaisonBroadcaster bus.Broadcaster
+ l *logger.Logger
+ mux sync.RWMutex
+}
+
+// NewInfoCollectorRegistry creates a new InfoCollectorRegistry.
+func NewInfoCollectorRegistry(l *logger.Logger, groupGetter GroupGetter)
*InfoCollectorRegistry {
+ return &InfoCollectorRegistry{
+ groupGetter: groupGetter,
+ dataCollectors: make(map[commonv1.Catalog]DataInfoCollector),
+ liaisonCollectors:
make(map[commonv1.Catalog]LiaisonInfoCollector),
+ l: l,
+ }
+}
+
+// CollectDataInfo collects data information from both local and remote data
nodes.
+func (icr *InfoCollectorRegistry) CollectDataInfo(ctx context.Context, group
string) ([]*databasev1.DataInfo, error) {
+ g, getErr := icr.groupGetter.GetGroup(ctx, group)
+ if getErr != nil {
+ return nil, getErr
+ }
+ localInfo, localErr := icr.collectDataInfoLocal(ctx, g.Catalog, group)
+ if localErr != nil {
+ return nil, localErr
+ }
+ localInfoList := []*databasev1.DataInfo{}
+ if localInfo != nil {
+ localInfoList = []*databasev1.DataInfo{localInfo}
+ }
+ if icr.dataBroadcaster == nil {
+ return localInfoList, nil
+ }
+
+ var topic bus.Topic
+ switch g.Catalog {
+ case commonv1.Catalog_CATALOG_MEASURE:
+ topic = data.TopicMeasureCollectDataInfo
+ case commonv1.Catalog_CATALOG_STREAM:
+ topic = data.TopicStreamCollectDataInfo
+ case commonv1.Catalog_CATALOG_TRACE:
+ topic = data.TopicTraceCollectDataInfo
+ default:
+ return nil, fmt.Errorf("unsupported catalog type: %v",
g.Catalog)
+ }
+ remoteInfo := icr.broadcastCollectDataInfo(topic, group)
+ return append(localInfoList, remoteInfo...), nil
+}
+
+func (icr *InfoCollectorRegistry) broadcastCollectDataInfo(topic bus.Topic,
group string) []*databasev1.DataInfo {
+ message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
&databasev1.GroupRegistryServiceInspectRequest{Group: group})
+ futures, broadcastErr := icr.dataBroadcaster.Broadcast(5*time.Second,
topic, message)
+ if broadcastErr != nil {
+ icr.l.Warn().Err(broadcastErr).Str("group", group).Msg("failed
to broadcast collect data info request")
+ return []*databasev1.DataInfo{}
+ }
+
+ dataInfoList := make([]*databasev1.DataInfo, 0, len(futures))
+ for _, future := range futures {
+ msg, getErr := future.Get()
+ if getErr != nil {
+ icr.l.Warn().Err(getErr).Str("group",
group).Msg("failed to get collect data info response")
+ continue
+ }
+ msgData := msg.Data()
+ switch d := msgData.(type) {
+ case *databasev1.DataInfo:
+ if d != nil {
+ dataInfoList = append(dataInfoList, d)
+ }
+ case *common.Error:
+ icr.l.Warn().Str("error", d.Error()).Str("group",
group).Msg("error collecting data info from node")
+ }
+ }
+ return dataInfoList
+}
+
+func (icr *InfoCollectorRegistry) collectDataInfoLocal(ctx context.Context,
catalog commonv1.Catalog, group string) (*databasev1.DataInfo, error) {
+ icr.mux.RLock()
+ collector, hasCollector := icr.dataCollectors[catalog]
+ icr.mux.RUnlock()
+ if hasCollector && collector != nil {
+ return collector.CollectDataInfo(ctx, group)
+ }
+ return nil, nil
+}
+
+// CollectLiaisonInfo collects liaison information from both local and remote
liaison nodes.
+func (icr *InfoCollectorRegistry) CollectLiaisonInfo(ctx context.Context,
group string) ([]*databasev1.LiaisonInfo, error) {
+ g, getErr := icr.groupGetter.GetGroup(ctx, group)
+ if getErr != nil {
+ return nil, getErr
+ }
+ localInfo, localErr := icr.collectLiaisonInfoLocal(ctx, g.Catalog,
group)
+ if localErr != nil {
+ return nil, localErr
+ }
+ localInfoList := []*databasev1.LiaisonInfo{}
+ if localInfo != nil {
+ localInfoList = []*databasev1.LiaisonInfo{localInfo}
+ }
+ if icr.liaisonBroadcaster == nil {
+ return localInfoList, nil
+ }
+
+ var topic bus.Topic
+ switch g.Catalog {
+ case commonv1.Catalog_CATALOG_MEASURE:
+ topic = data.TopicMeasureCollectLiaisonInfo
+ case commonv1.Catalog_CATALOG_STREAM:
+ topic = data.TopicStreamCollectLiaisonInfo
+ case commonv1.Catalog_CATALOG_TRACE:
+ topic = data.TopicTraceCollectLiaisonInfo
+ default:
+ return nil, fmt.Errorf("unsupported catalog type: %v",
g.Catalog)
+ }
+ remoteInfo := icr.broadcastCollectLiaisonInfo(topic, group)
+ return append(localInfoList, remoteInfo...), nil
+}
+
+func (icr *InfoCollectorRegistry) broadcastCollectLiaisonInfo(topic bus.Topic,
group string) []*databasev1.LiaisonInfo {
+ message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
&databasev1.GroupRegistryServiceInspectRequest{Group: group})
+ futures, broadcastErr :=
icr.liaisonBroadcaster.Broadcast(5*time.Second, topic, message)
+ if broadcastErr != nil {
+ icr.l.Warn().Err(broadcastErr).Str("group", group).Msg("failed
to broadcast collect liaison info request")
+ return []*databasev1.LiaisonInfo{}
+ }
+
+ liaisonInfoList := make([]*databasev1.LiaisonInfo, 0, len(futures))
+ for _, future := range futures {
+ msg, getErr := future.Get()
+ if getErr != nil {
+ icr.l.Warn().Err(getErr).Str("group",
group).Msg("failed to get collect liaison info response")
+ continue
+ }
+ msgData := msg.Data()
+ switch d := msgData.(type) {
+ case *databasev1.LiaisonInfo:
+ if d != nil {
+ liaisonInfoList = append(liaisonInfoList, d)
+ }
+ case *common.Error:
+ icr.l.Warn().Str("error", d.Error()).Str("group",
group).Msg("error collecting liaison info from node")
+ }
+ }
+ return liaisonInfoList
+}
+
+func (icr *InfoCollectorRegistry) collectLiaisonInfoLocal(ctx context.Context,
catalog commonv1.Catalog, group string) (*databasev1.LiaisonInfo, error) {
+ icr.mux.RLock()
+ collector, hasCollector := icr.liaisonCollectors[catalog]
+ icr.mux.RUnlock()
+ if hasCollector && collector != nil {
+ return collector.CollectLiaisonInfo(ctx, group)
+ }
+ return nil, nil
+}
+
+// RegisterDataCollector registers a data info collector for a specific
catalog.
+func (icr *InfoCollectorRegistry) RegisterDataCollector(catalog
commonv1.Catalog, collector DataInfoCollector) {
+ icr.mux.Lock()
+ defer icr.mux.Unlock()
+ icr.dataCollectors[catalog] = collector
+}
+
+// RegisterLiaisonCollector registers a liaison info collector for a specific
catalog.
+func (icr *InfoCollectorRegistry) RegisterLiaisonCollector(catalog
commonv1.Catalog, collector LiaisonInfoCollector) {
+ icr.mux.Lock()
+ defer icr.mux.Unlock()
+ icr.liaisonCollectors[catalog] = collector
+}
+
+// SetDataBroadcaster sets the broadcaster for data info collection.
+func (icr *InfoCollectorRegistry) SetDataBroadcaster(broadcaster
bus.Broadcaster) {
+ icr.mux.Lock()
+ defer icr.mux.Unlock()
+ icr.dataBroadcaster = broadcaster
+}
+
+// SetLiaisonBroadcaster sets the broadcaster for liaison info collection.
+func (icr *InfoCollectorRegistry) SetLiaisonBroadcaster(broadcaster
bus.Broadcaster) {
+ icr.mux.Lock()
+ defer icr.mux.Unlock()
+ icr.liaisonBroadcaster = broadcaster
+}
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index d5283e53..7be2b159 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -128,6 +128,8 @@ func CheckInterval(d time.Duration) WatcherOption {
}
}
+var _ Registry = (*etcdSchemaRegistry)(nil)
+
type etcdSchemaRegistry struct {
client *clientv3.Client
closer *run.Closer
@@ -256,11 +258,12 @@ func NewEtcdSchemaRegistry(options ...RegistryOption)
(Registry, error) {
if err != nil {
return nil, err
}
+ schemaLogger := logger.GetLogger("schema-registry")
reg := &etcdSchemaRegistry{
namespace: registryConfig.namespace,
client: client,
closer: run.NewCloser(1),
- l: logger.GetLogger("schema-registry"),
+ l: schemaLogger,
checkInterval: registryConfig.checkInterval,
watchers: make(map[Kind]*watcher),
}
diff --git a/banyand/metadata/schema/schema.go
b/banyand/metadata/schema/schema.go
index 014c9ac8..49cdf8d6 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -38,6 +38,16 @@ type EventHandler interface {
OnDelete(Metadata)
}
+// DataInfoCollector provides methods to collect data node info.
+type DataInfoCollector interface {
+ CollectDataInfo(ctx context.Context, group string)
(*databasev1.DataInfo, error)
+}
+
+// LiaisonInfoCollector provides methods to collect liaison node info.
+type LiaisonInfoCollector interface {
+ CollectLiaisonInfo(ctx context.Context, group string)
(*databasev1.LiaisonInfo, error)
+}
+
// UnimplementedOnInitHandler is a placeholder for unimplemented OnInitHandler.
type UnimplementedOnInitHandler struct{}
diff --git a/banyand/stream/introducer.go b/banyand/stream/introducer.go
index fa07bd37..a9dda746 100644
--- a/banyand/stream/introducer.go
+++ b/banyand/stream/introducer.go
@@ -221,6 +221,7 @@ func (tst *tsTable) introduceMemPart(nextIntroduction
*introduction, epoch uint6
}
next := nextIntroduction.memPart
+ tst.addPendingDataCount(-int64(next.mp.partMetadata.TotalCount))
nextSnp := cur.copyAllTo(epoch)
nextSnp.parts = append(nextSnp.parts, next)
nextSnp.creator = snapshotCreatorMemPart
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 849498a9..dca00f73 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -55,13 +55,17 @@ type schemaRepo struct {
l *logger.Logger
metadata metadata.Repo
path string
+ nodeID string
+ role databasev1.Role
}
-func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string)
schemaRepo {
+func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string,
nodeID string) schemaRepo {
sr := schemaRepo{
l: svc.l,
path: path,
metadata: svc.metadata,
+ nodeID: nodeID,
+ role: databasev1.Role_ROLE_DATA,
Repository: resourceSchema.NewRepository(
svc.metadata,
svc.l,
@@ -78,6 +82,7 @@ func newLiaisonSchemaRepo(path string, svc *liaison,
streamDataNodeRegistry grpc
l: svc.l,
path: path,
metadata: svc.metadata,
+ role: databasev1.Role_ROLE_LIAISON,
Repository: resourceSchema.NewRepository(
svc.metadata,
svc.l,
@@ -231,17 +236,226 @@ func (sr *schemaRepo) loadStream(metadata
*commonv1.Metadata) (*stream, bool) {
}
func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable,
option], error) {
+ if sr == nil {
+ return nil, fmt.Errorf("schemaRepo is nil")
+ }
g, ok := sr.LoadGroup(groupName)
if !ok {
return nil, fmt.Errorf("group %s not found", groupName)
}
db := g.SupplyTSDB()
if db == nil {
- return nil, fmt.Errorf("tsdb for group %s not found", groupName)
+ return nil, fmt.Errorf("group %s not found", groupName)
}
return db.(storage.TSDB[*tsTable, option]), nil
}
+// CollectDataInfo collects data info for a specific group.
+func (sr *schemaRepo) CollectDataInfo(ctx context.Context, group string)
(*databasev1.DataInfo, error) {
+ if sr.nodeID == "" {
+ return nil, fmt.Errorf("node ID is empty")
+ }
+ node, nodeErr := sr.metadata.NodeRegistry().GetNode(ctx, sr.nodeID)
+ if nodeErr != nil {
+ return nil, fmt.Errorf("failed to get current node info: %w",
nodeErr)
+ }
+ tsdb, tsdbErr := sr.loadTSDB(group)
+ if tsdbErr != nil {
+ return nil, tsdbErr
+ }
+ if tsdb == nil {
+ return nil, nil
+ }
+ segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+ Start: time.Unix(0, 0),
+ End: time.Unix(0, timestamp.MaxNanoTime),
+ })
+ if segmentsErr != nil {
+ return nil, segmentsErr
+ }
+ var segmentInfoList []*databasev1.SegmentInfo
+ var totalDataSize int64
+ for _, segment := range segments {
+ timeRange := segment.GetTimeRange()
+ tables, _ := segment.Tables()
+ var shardInfoList []*databasev1.ShardInfo
+ for shardIdx, table := range tables {
+ shardInfo := sr.collectShardInfo(table,
uint32(shardIdx))
+ shardInfoList = append(shardInfoList, shardInfo)
+ totalDataSize += shardInfo.DataSizeBytes
+ }
+ seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
+ totalDataSize += seriesIndexInfo.DataSizeBytes
+ segmentInfo := &databasev1.SegmentInfo{
+ SegmentId: fmt.Sprintf("%d-%d",
timeRange.Start.UnixNano(), timeRange.End.UnixNano()),
+ TimeRangeStart:
timeRange.Start.Format(time.RFC3339Nano),
+ TimeRangeEnd: timeRange.End.Format(time.RFC3339Nano),
+ ShardInfo: shardInfoList,
+ SeriesIndexInfo: seriesIndexInfo,
+ }
+ segmentInfoList = append(segmentInfoList, segmentInfo)
+ segment.DecRef()
+ }
+ dataInfo := &databasev1.DataInfo{
+ Node: node,
+ SegmentInfo: segmentInfoList,
+ DataSizeBytes: totalDataSize,
+ }
+ return dataInfo, nil
+}
+
+func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable,
option]) *databasev1.SeriesIndexInfo {
+ indexDB := segment.IndexDB()
+ if indexDB == nil {
+ return &databasev1.SeriesIndexInfo{
+ DataCount: 0,
+ DataSizeBytes: 0,
+ }
+ }
+ dataCount, dataSizeBytes := indexDB.Stats()
+ return &databasev1.SeriesIndexInfo{
+ DataCount: dataCount,
+ DataSizeBytes: dataSizeBytes,
+ }
+}
+
+func (sr *schemaRepo) collectShardInfo(table any, shardID uint32)
*databasev1.ShardInfo {
+ tst, ok := table.(*tsTable)
+ if !ok {
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ DataCount: 0,
+ DataSizeBytes: 0,
+ PartCount: 0,
+ }
+ }
+ snapshot := tst.currentSnapshot()
+ if snapshot == nil {
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ DataCount: 0,
+ DataSizeBytes: 0,
+ PartCount: 0,
+ }
+ }
+ defer snapshot.decRef()
+ var totalCount, compressedSize, uncompressedSize, partCount uint64
+ for _, pw := range snapshot.parts {
+ if pw.p != nil {
+ totalCount += pw.p.partMetadata.TotalCount
+ compressedSize += pw.p.partMetadata.CompressedSizeBytes
+ uncompressedSize +=
pw.p.partMetadata.UncompressedSizeBytes
+ partCount++
+ } else if pw.mp != nil {
+ totalCount += pw.mp.partMetadata.TotalCount
+ compressedSize += pw.mp.partMetadata.CompressedSizeBytes
+ uncompressedSize +=
pw.mp.partMetadata.UncompressedSizeBytes
+ partCount++
+ }
+ }
+ invertedIndexInfo := sr.collectInvertedIndexInfo(tst)
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ DataCount: int64(totalCount),
+ DataSizeBytes: int64(compressedSize),
+ PartCount: int64(partCount),
+ InvertedIndexInfo: invertedIndexInfo,
+ SidxInfo: &databasev1.SIDXInfo{},
+ }
+}
+
+func (sr *schemaRepo) collectInvertedIndexInfo(tst *tsTable)
*databasev1.InvertedIndexInfo {
+ if tst.index == nil {
+ return &databasev1.InvertedIndexInfo{
+ DataCount: 0,
+ DataSizeBytes: 0,
+ }
+ }
+ dataCount, dataSizeBytes := tst.index.store.Stats()
+ return &databasev1.InvertedIndexInfo{
+ DataCount: dataCount,
+ DataSizeBytes: dataSizeBytes,
+ }
+}
+
+func (sr *schemaRepo) collectPendingWriteInfo(groupName string) (int64, error)
{
+ if sr == nil || sr.Repository == nil {
+ return 0, fmt.Errorf("schema repository is not initialized")
+ }
+ if sr.role == databasev1.Role_ROLE_LIAISON {
+ queue, queueErr := sr.loadQueue(groupName)
+ if queueErr != nil {
+ return 0, fmt.Errorf("failed to load queue: %w",
queueErr)
+ }
+ if queue == nil {
+ return 0, nil
+ }
+ var pendingWriteCount int64
+ for _, sq := range queue.SubQueues() {
+ if sq != nil {
+ pendingWriteCount += sq.getPendingDataCount()
+ }
+ }
+ return pendingWriteCount, nil
+ }
+ // Standalone mode
+ tsdb, tsdbErr := sr.loadTSDB(groupName)
+ if tsdbErr != nil {
+ return 0, fmt.Errorf("failed to load TSDB: %w", tsdbErr)
+ }
+ if tsdb == nil {
+ return 0, fmt.Errorf("TSDB is nil for group %s", groupName)
+ }
+ segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+ Start: time.Unix(0, 0),
+ End: time.Unix(0, timestamp.MaxNanoTime),
+ })
+ if segmentsErr != nil {
+ return 0, fmt.Errorf("failed to select segments: %w",
segmentsErr)
+ }
+ var pendingWriteCount int64
+ for _, segment := range segments {
+ tables, _ := segment.Tables()
+ for _, tst := range tables {
+ pendingWriteCount += tst.getPendingDataCount()
+ }
+ segment.DecRef()
+ }
+ return pendingWriteCount, nil
+}
+
+func (sr *schemaRepo) collectPendingSyncInfo(groupName string) (partCount
int64, totalSizeBytes int64, err error) {
+ if sr == nil || sr.Repository == nil {
+ return 0, 0, fmt.Errorf("schema repository is not initialized")
+ }
+ // Only liaison nodes collect pending sync info
+ if sr.role != databasev1.Role_ROLE_LIAISON {
+ return 0, 0, nil
+ }
+ queue, queueErr := sr.loadQueue(groupName)
+ if queueErr != nil {
+ return 0, 0, fmt.Errorf("failed to load queue: %w", queueErr)
+ }
+ if queue == nil {
+ return 0, 0, nil
+ }
+ for _, sq := range queue.SubQueues() {
+ if sq != nil {
+ snapshot := sq.currentSnapshot()
+ if snapshot != nil {
+ for _, pw := range snapshot.parts {
+ if pw.mp == nil && pw.p != nil &&
pw.p.partMetadata.TotalCount > 0 {
+ partCount++
+ totalSizeBytes +=
int64(pw.p.partMetadata.CompressedSizeBytes)
+ }
+ }
+ snapshot.decRef()
+ }
+ }
+ }
+ return partCount, totalSizeBytes, nil
+}
+
func (sr *schemaRepo) loadQueue(groupName string) (*wqueue.Queue[*tsTable,
option], error) {
g, ok := sr.LoadGroup(groupName)
if !ok {
diff --git a/banyand/stream/metrics.go b/banyand/stream/metrics.go
index 91686b49..3a757027 100644
--- a/banyand/stream/metrics.go
+++ b/banyand/stream/metrics.go
@@ -32,8 +32,8 @@ var (
)
type metrics struct {
- tbMetrics
- indexMetrics *inverted.Metrics
+ indexMetrics *inverted.Metrics
+
totalWritten meter.Counter
totalBatch meter.Counter
totalBatchIntroLatency meter.Counter
@@ -66,6 +66,8 @@ type metrics struct {
totalMergedParts meter.Counter
totalMergeLatency meter.Counter
totalMerged meter.Counter
+
+ tbMetrics
}
func (tst *tsTable) incTotalWritten(delta int) {
@@ -250,6 +252,18 @@ func (tst *tsTable) incTotalMerged(delta int, typ string) {
tst.metrics.totalMerged.Inc(float64(delta), typ)
}
+func (tst *tsTable) addPendingDataCount(delta int64) {
+ tst.pendingDataCount.Add(delta)
+ if tst.metrics == nil {
+ return
+ }
+ tst.metrics.tbMetrics.pendingDataCount.Add(float64(delta),
tst.p.ShardLabelValues()...)
+}
+
+func (tst *tsTable) getPendingDataCount() int64 {
+ return tst.pendingDataCount.Load()
+}
+
func (m *metrics) DeleteAll() {
if m == nil {
return
@@ -334,6 +348,7 @@ func (s *supplier) newMetrics(p common.Position)
storage.Metrics {
totalFileBlocks:
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
totalFilePartBytes:
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
totalFilePartUncompressedBytes:
factory.NewGauge("total_file_part_uncompressed_bytes",
common.ShardLabelNames()...),
+ pendingDataCount:
factory.NewGauge("pending_data_count", common.ShardLabelNames()...),
},
indexMetrics: inverted.NewMetrics(factory,
common.SegLabelNames()...),
}
@@ -379,6 +394,7 @@ func (s *queueSupplier) newMetrics(p common.Position)
storage.Metrics {
totalFileBlocks:
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
totalFilePartBytes:
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
totalFilePartUncompressedBytes:
factory.NewGauge("total_file_part_uncompressed_bytes",
common.ShardLabelNames()...),
+ pendingDataCount:
factory.NewGauge("pending_data_count", common.ShardLabelNames()...),
},
indexMetrics: inverted.NewMetrics(factory,
common.SegLabelNames()...),
}
@@ -442,6 +458,7 @@ func (tst *tsTable) deleteMetrics() {
tst.metrics.tbMetrics.totalFileBlocks.Delete(tst.p.ShardLabelValues()...)
tst.metrics.tbMetrics.totalFilePartBytes.Delete(tst.p.ShardLabelValues()...)
tst.metrics.tbMetrics.totalFilePartUncompressedBytes.Delete(tst.p.ShardLabelValues()...)
+
tst.metrics.tbMetrics.pendingDataCount.Delete(tst.p.ShardLabelValues()...)
tst.metrics.indexMetrics.DeleteAll(tst.p.SegLabelValues()...)
}
@@ -457,4 +474,6 @@ type tbMetrics struct {
totalFileBlocks meter.Gauge
totalFilePartBytes meter.Gauge
totalFilePartUncompressedBytes meter.Gauge
+
+ pendingDataCount meter.Gauge
}
diff --git a/banyand/stream/svc_liaison.go b/banyand/stream/svc_liaison.go
index e3e2b5de..25a3ec86 100644
--- a/banyand/stream/svc_liaison.go
+++ b/banyand/stream/svc_liaison.go
@@ -19,6 +19,7 @@ package stream
import (
"context"
+ "fmt"
"path"
"path/filepath"
"strings"
@@ -78,6 +79,33 @@ func (s *liaison) GetRemovalSegmentsTimeRange(group string)
*timestamp.TimeRange
return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
}
+func (s *liaison) CollectDataInfo(_ context.Context, _ string)
(*databasev1.DataInfo, error) {
+ return nil, errors.New("collect data info is not supported on liaison
node")
+}
+
+// CollectLiaisonInfo collects liaison node info.
+func (s *liaison) CollectLiaisonInfo(_ context.Context, group string)
(*databasev1.LiaisonInfo, error) {
+ info := &databasev1.LiaisonInfo{
+ PendingWriteDataCount: 0,
+ PendingSyncPartCount: 0,
+ PendingSyncDataSizeBytes: 0,
+ PendingHandoffPartCount: 0,
+ PendingHandoffDataSizeBytes: 0,
+ }
+ pendingWriteCount, writeErr :=
s.schemaRepo.collectPendingWriteInfo(group)
+ if writeErr != nil {
+ return nil, fmt.Errorf("failed to collect pending write info:
%w", writeErr)
+ }
+ info.PendingWriteDataCount = pendingWriteCount
+ pendingSyncPartCount, pendingSyncDataSizeBytes, syncErr :=
s.schemaRepo.collectPendingSyncInfo(group)
+ if syncErr != nil {
+ return nil, fmt.Errorf("failed to collect pending sync info:
%w", syncErr)
+ }
+ info.PendingSyncPartCount = pendingSyncPartCount
+ info.PendingSyncDataSizeBytes = pendingSyncDataSizeBytes
+ return info, nil
+}
+
func (s *liaison) FlagSet() *run.FlagSet {
flagS := run.NewFlagSet("storage")
flagS.StringVar(&s.root, "stream-root-path", "/tmp", "the root path of
stream")
@@ -155,6 +183,15 @@ func (s *liaison) PreRun(ctx context.Context) error {
// Register chunked sync handler for stream data
s.pipeline.RegisterChunkedSyncHandler(data.TopicStreamPartSync,
setUpChunkedSyncCallback(s.l, &s.schemaRepo))
+ if metaSvc, ok := s.metadata.(metadata.Service); ok {
+
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_STREAM, s)
+ }
+
+ collectLiaisonInfoListener := &collectLiaisonInfoListener{s: s}
+ if subscribeErr :=
s.pipeline.Subscribe(data.TopicStreamCollectLiaisonInfo,
collectLiaisonInfoListener); subscribeErr != nil {
+ return fmt.Errorf("failed to subscribe to collect liaison info
topic: %w", subscribeErr)
+ }
+
return s.pipeline.Subscribe(data.TopicStreamWrite, s.writeListener)
}
@@ -181,3 +218,20 @@ func NewLiaison(metadata metadata.Repo, pipeline
queue.Server, omr observability
},
}, nil
}
+
+type collectLiaisonInfoListener struct {
+ *bus.UnImplementedHealthyListener
+ s *liaison
+}
+
+func (l *collectLiaisonInfoListener) Rev(ctx context.Context, message
bus.Message) bus.Message {
+ req, ok :=
message.Data().(*databasev1.GroupRegistryServiceInspectRequest)
+ if !ok {
+ return bus.NewMessage(message.ID(), common.NewError("invalid
data type for collect liaison info request"))
+ }
+ liaisonInfo, collectErr := l.s.CollectLiaisonInfo(ctx, req.Group)
+ if collectErr != nil {
+ return bus.NewMessage(message.ID(), common.NewError("failed to
collect liaison info: %v", collectErr))
+ }
+ return bus.NewMessage(message.ID(), liaisonInfo)
+}
diff --git a/banyand/stream/svc_standalone.go b/banyand/stream/svc_standalone.go
index 1be24bf7..eea34da8 100644
--- a/banyand/stream/svc_standalone.go
+++ b/banyand/stream/svc_standalone.go
@@ -19,6 +19,7 @@ package stream
import (
"context"
+ "fmt"
"path"
"path/filepath"
"strings"
@@ -57,6 +58,8 @@ type Service interface {
run.Config
run.Service
Query
+ CollectDataInfo(context.Context, string) (*databasev1.DataInfo, error)
+ CollectLiaisonInfo(context.Context, string) (*databasev1.LiaisonInfo,
error)
}
var _ Service = (*standalone)(nil)
@@ -238,11 +241,20 @@ func (s *standalone) PreRun(ctx context.Context) error {
if !strings.HasPrefix(filepath.VolumeName(s.dataPath),
filepath.VolumeName(path)) {
obsservice.UpdatePath(s.dataPath)
}
- s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels)
+ s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels, node.NodeID)
+ if metaSvc, ok := s.metadata.(metadata.Service); ok {
+ metaSvc.RegisterDataCollector(commonv1.Catalog_CATALOG_STREAM,
&s.schemaRepo)
+
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_STREAM, s)
+ }
if s.pipeline == nil {
return nil
}
+ collectDataInfoListener := &collectDataInfoListener{s: s}
+ if subscribeErr :=
s.pipeline.Subscribe(data.TopicStreamCollectDataInfo, collectDataInfoListener);
subscribeErr != nil {
+ return fmt.Errorf("failed to subscribe to collect data info
topic: %w", subscribeErr)
+ }
+
s.localPipeline = queue.Local()
if err := s.pipeline.Subscribe(data.TopicSnapshot, &snapshotListener{s:
s}); err != nil {
return err
@@ -347,3 +359,34 @@ func (d *deleteStreamSegmentsListener) Rev(_
context.Context, message bus.Messag
deleted := db.DeleteExpiredSegments(req.SegmentSuffixes)
return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), deleted)
}
+
+func (s *standalone) CollectDataInfo(ctx context.Context, group string)
(*databasev1.DataInfo, error) {
+ return s.schemaRepo.CollectDataInfo(ctx, group)
+}
+
+func (s *standalone) CollectLiaisonInfo(_ context.Context, group string)
(*databasev1.LiaisonInfo, error) {
+ info := &databasev1.LiaisonInfo{}
+ pendingWriteCount, writeErr :=
s.schemaRepo.collectPendingWriteInfo(group)
+ if writeErr != nil {
+ return nil, fmt.Errorf("failed to collect pending write info:
%w", writeErr)
+ }
+ info.PendingWriteDataCount = pendingWriteCount
+ return info, nil
+}
+
+type collectDataInfoListener struct {
+ *bus.UnImplementedHealthyListener
+ s *standalone
+}
+
+func (l *collectDataInfoListener) Rev(ctx context.Context, message
bus.Message) bus.Message {
+ req, ok :=
message.Data().(*databasev1.GroupRegistryServiceInspectRequest)
+ if !ok {
+ return bus.NewMessage(message.ID(), common.NewError("invalid
data type for collect data info request"))
+ }
+ dataInfo, collectErr := l.s.schemaRepo.CollectDataInfo(ctx, req.Group)
+ if collectErr != nil {
+ return bus.NewMessage(message.ID(), common.NewError("failed to
collect data info: %v", collectErr))
+ }
+ return bus.NewMessage(message.ID(), dataInfo)
+}
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 26f345f0..cbd4ce81 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -49,21 +49,22 @@ const (
)
type tsTable struct {
- fileSystem fs.FileSystem
- pm protector.Memory
- metrics *metrics
- index *elementIndex
- snapshot *snapshot
- loopCloser *run.Closer
- getNodes func() []string
- l *logger.Logger
- introductions chan *introduction
- p common.Position
- group string
- root string
- gc garbageCleaner
- option option
- curPartID uint64
+ fileSystem fs.FileSystem
+ pm protector.Memory
+ metrics *metrics
+ index *elementIndex
+ snapshot *snapshot
+ loopCloser *run.Closer
+ getNodes func() []string
+ l *logger.Logger
+ introductions chan *introduction
+ p common.Position
+ group string
+ root string
+ gc garbageCleaner
+ option option
+ curPartID uint64
+ pendingDataCount atomic.Int64
sync.RWMutex
shardID common.ShardID
}
@@ -327,9 +328,11 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
ind.memPart.p.partMetadata.ID = atomic.AddUint64(&tst.curPartID, 1)
startTime := time.Now()
totalCount := mp.partMetadata.TotalCount
+ tst.addPendingDataCount(int64(totalCount))
select {
case tst.introductions <- ind:
case <-tst.loopCloser.CloseNotify():
+ tst.addPendingDataCount(-int64(totalCount))
return
}
select {
diff --git a/banyand/trace/handoff_controller.go
b/banyand/trace/handoff_controller.go
index c3d1cfea..18f8d66c 100644
--- a/banyand/trace/handoff_controller.go
+++ b/banyand/trace/handoff_controller.go
@@ -653,6 +653,21 @@ func (hc *handoffController) getTotalSize() uint64 {
return hc.currentTotalSize
}
+// stats returns handoff statistics including part count and total size.
+func (hc *handoffController) stats() (partCount int64, totalSize int64) {
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+ var count int64
+ for _, nodeQueue := range hc.nodeQueues {
+ pending, listErr := nodeQueue.listPending()
+ if listErr != nil {
+ continue
+ }
+ count += int64(len(pending))
+ }
+ return count, int64(hc.getTotalSize())
+}
+
// canEnqueue checks if adding a part of the given size would exceed the total
size limit.
func (hc *handoffController) canEnqueue(partSize uint64) bool {
if hc.maxTotalSizeBytes == 0 {
diff --git a/banyand/trace/introducer.go b/banyand/trace/introducer.go
index be2940c2..1b7e3e9a 100644
--- a/banyand/trace/introducer.go
+++ b/banyand/trace/introducer.go
@@ -231,6 +231,7 @@ func (tst *tsTable) introduceMemPart(nextIntroduction
*introduction, epoch uint6
}
next := nextIntroduction.memPart
+ tst.addPendingDataCount(-int64(next.mp.partMetadata.TotalCount))
nextSnp := cur.copyAllTo(epoch)
nextSnp.parts = append(nextSnp.parts, next)
nextSnp.creator = snapshotCreatorMemPart
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 6ddc1182..55a9d2c8 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -320,6 +320,7 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem,
closeCh <-chan struct{}
pm.MaxTimestamp = maxTimestamp
pm.mustWriteMetadata(fileSystem, dstPath)
tf.mustWriteTraceIDFilter(fileSystem, dstPath)
+ tf.reset()
tt.mustWriteTagType(fileSystem, dstPath)
fileSystem.SyncPath(dstPath)
p := mustOpenFilePart(partID, root, fileSystem)
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index 82040fb5..26f2e0ea 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -61,13 +61,17 @@ type schemaRepo struct {
l *logger.Logger
metadata metadata.Repo
path string
+ nodeID string
+ role databasev1.Role
}
-func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string)
schemaRepo {
+func newSchemaRepo(path string, svc *standalone, nodeLabels map[string]string,
nodeID string) schemaRepo {
sr := schemaRepo{
l: svc.l,
path: path,
metadata: svc.metadata,
+ nodeID: nodeID,
+ role: databasev1.Role_ROLE_DATA,
Repository: resourceSchema.NewRepository(
svc.metadata,
svc.l,
@@ -84,6 +88,7 @@ func newLiaisonSchemaRepo(path string, svc *liaison,
traceDataNodeRegistry grpc.
l: svc.l,
path: path,
metadata: svc.metadata,
+ role: databasev1.Role_ROLE_LIAISON,
Repository: resourceSchema.NewRepository(
svc.metadata,
svc.l,
@@ -238,17 +243,242 @@ func (sr *schemaRepo) loadTrace(metadata
*commonv1.Metadata) (*trace, bool) {
}
func (sr *schemaRepo) loadTSDB(groupName string) (storage.TSDB[*tsTable,
option], error) {
+ if sr == nil {
+ return nil, fmt.Errorf("schemaRepo is nil")
+ }
g, ok := sr.LoadGroup(groupName)
if !ok {
return nil, fmt.Errorf("group %s not found", groupName)
}
db := g.SupplyTSDB()
if db == nil {
- return nil, fmt.Errorf("tsdb for group %s not found", groupName)
+ return nil, fmt.Errorf("group %s not found", groupName)
}
return db.(storage.TSDB[*tsTable, option]), nil
}
+// CollectDataInfo collects data info for a specific group.
+func (sr *schemaRepo) CollectDataInfo(ctx context.Context, group string)
(*databasev1.DataInfo, error) {
+ if sr.nodeID == "" {
+ return nil, fmt.Errorf("node ID is empty")
+ }
+ node, nodeErr := sr.metadata.NodeRegistry().GetNode(ctx, sr.nodeID)
+ if nodeErr != nil {
+ return nil, fmt.Errorf("failed to get current node info: %w",
nodeErr)
+ }
+ tsdb, tsdbErr := sr.loadTSDB(group)
+ if tsdbErr != nil {
+ return nil, tsdbErr
+ }
+ if tsdb == nil {
+ return nil, nil
+ }
+ segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+ Start: time.Unix(0, 0),
+ End: time.Unix(0, timestamp.MaxNanoTime),
+ })
+ if segmentsErr != nil {
+ return nil, segmentsErr
+ }
+ var segmentInfoList []*databasev1.SegmentInfo
+ var totalDataSize int64
+ for _, segment := range segments {
+ timeRange := segment.GetTimeRange()
+ tables, _ := segment.Tables()
+ var shardInfoList []*databasev1.ShardInfo
+ for shardIdx, table := range tables {
+ shardInfo := sr.collectShardInfo(ctx, table,
uint32(shardIdx))
+ shardInfoList = append(shardInfoList, shardInfo)
+ totalDataSize += shardInfo.DataSizeBytes
+ }
+ seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
+ totalDataSize += seriesIndexInfo.DataSizeBytes
+ segmentInfo := &databasev1.SegmentInfo{
+ SegmentId: fmt.Sprintf("%d-%d",
timeRange.Start.UnixNano(), timeRange.End.UnixNano()),
+ TimeRangeStart:
timeRange.Start.Format(time.RFC3339Nano),
+ TimeRangeEnd: timeRange.End.Format(time.RFC3339Nano),
+ ShardInfo: shardInfoList,
+ SeriesIndexInfo: seriesIndexInfo,
+ }
+ segmentInfoList = append(segmentInfoList, segmentInfo)
+ segment.DecRef()
+ }
+ dataInfo := &databasev1.DataInfo{
+ Node: node,
+ SegmentInfo: segmentInfoList,
+ DataSizeBytes: totalDataSize,
+ }
+ return dataInfo, nil
+}
+
+func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable,
option]) *databasev1.SeriesIndexInfo {
+ indexDB := segment.IndexDB()
+ if indexDB == nil {
+ return &databasev1.SeriesIndexInfo{
+ DataCount: 0,
+ DataSizeBytes: 0,
+ }
+ }
+ dataCount, dataSizeBytes := indexDB.Stats()
+ return &databasev1.SeriesIndexInfo{
+ DataCount: dataCount,
+ DataSizeBytes: dataSizeBytes,
+ }
+}
+
+func (sr *schemaRepo) collectShardInfo(ctx context.Context, table any, shardID
uint32) *databasev1.ShardInfo {
+ tst, ok := table.(*tsTable)
+ if !ok {
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ DataCount: 0,
+ DataSizeBytes: 0,
+ PartCount: 0,
+ }
+ }
+ snapshot := tst.currentSnapshot()
+ if snapshot == nil {
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ DataCount: 0,
+ DataSizeBytes: 0,
+ PartCount: 0,
+ }
+ }
+ defer snapshot.decRef()
+ var totalCount, compressedSize, uncompressedSize, partCount uint64
+ for _, pw := range snapshot.parts {
+ if pw.p != nil {
+ totalCount += pw.p.partMetadata.TotalCount
+ compressedSize += pw.p.partMetadata.CompressedSizeBytes
+ uncompressedSize +=
pw.p.partMetadata.UncompressedSpanSizeBytes
+ partCount++
+ } else if pw.mp != nil {
+ totalCount += pw.mp.partMetadata.TotalCount
+ compressedSize += pw.mp.partMetadata.CompressedSizeBytes
+ uncompressedSize +=
pw.mp.partMetadata.UncompressedSpanSizeBytes
+ partCount++
+ }
+ }
+ sidxInfo := sr.collectSidxInfo(ctx, tst)
+ return &databasev1.ShardInfo{
+ ShardId: shardID,
+ DataCount: int64(totalCount),
+ DataSizeBytes: int64(compressedSize),
+ PartCount: int64(partCount),
+ InvertedIndexInfo: &databasev1.InvertedIndexInfo{},
+ SidxInfo: sidxInfo,
+ }
+}
+
+func (sr *schemaRepo) collectSidxInfo(ctx context.Context, tst *tsTable)
*databasev1.SIDXInfo {
+ sidxMap := tst.sidxMap
+ if len(sidxMap) == 0 {
+ return &databasev1.SIDXInfo{
+ DataCount: 0,
+ DataSizeBytes: 0,
+ PartCount: 0,
+ }
+ }
+ timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
+ defer cancel()
+ var totalDataCount, totalDataSize, totalPartCount int64
+ for _, sidxInstance := range sidxMap {
+ stats, statsErr := sidxInstance.Stats(timeoutCtx)
+ if statsErr != nil {
+ continue
+ }
+ if stats != nil {
+ totalDataCount += stats.ElementCount
+ totalDataSize += stats.DiskUsageBytes
+ totalPartCount += stats.PartCount
+ }
+ }
+ return &databasev1.SIDXInfo{
+ DataCount: totalDataCount,
+ DataSizeBytes: totalDataSize,
+ PartCount: totalPartCount,
+ }
+}
+
+func (sr *schemaRepo) collectPendingWriteInfo(groupName string) (int64, error)
{
+ if sr == nil || sr.Repository == nil {
+ return 0, fmt.Errorf("schema repository is not initialized")
+ }
+ if sr.role == databasev1.Role_ROLE_LIAISON {
+ queue, queueErr := sr.loadQueue(groupName)
+ if queueErr != nil {
+ return 0, fmt.Errorf("failed to load queue: %w",
queueErr)
+ }
+ if queue == nil {
+ return 0, nil
+ }
+ var pendingWriteCount int64
+ for _, sq := range queue.SubQueues() {
+ if sq != nil {
+ pendingWriteCount += sq.getPendingDataCount()
+ }
+ }
+ return pendingWriteCount, nil
+ }
+ // Standalone mode
+ tsdb, tsdbErr := sr.loadTSDB(groupName)
+ if tsdbErr != nil {
+ return 0, fmt.Errorf("failed to load TSDB: %w", tsdbErr)
+ }
+ if tsdb == nil {
+ return 0, fmt.Errorf("TSDB is nil for group %s", groupName)
+ }
+ segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
+ Start: time.Unix(0, 0),
+ End: time.Unix(0, timestamp.MaxNanoTime),
+ })
+ if segmentsErr != nil {
+ return 0, fmt.Errorf("failed to select segments: %w",
segmentsErr)
+ }
+ var pendingWriteCount int64
+ for _, segment := range segments {
+ tables, _ := segment.Tables()
+ for _, tst := range tables {
+ pendingWriteCount += tst.getPendingDataCount()
+ }
+ segment.DecRef()
+ }
+ return pendingWriteCount, nil
+}
+
+func (sr *schemaRepo) collectPendingSyncInfo(groupName string) (partCount
int64, totalSizeBytes int64, err error) {
+ if sr == nil || sr.Repository == nil {
+ return 0, 0, fmt.Errorf("schema repository is not initialized")
+ }
+ // Only liaison nodes collect pending sync info
+ if sr.role != databasev1.Role_ROLE_LIAISON {
+ return 0, 0, nil
+ }
+ queue, queueErr := sr.loadQueue(groupName)
+ if queueErr != nil {
+ return 0, 0, fmt.Errorf("failed to load queue: %w", queueErr)
+ }
+ if queue == nil {
+ return 0, 0, nil
+ }
+ for _, sq := range queue.SubQueues() {
+ if sq != nil {
+ snapshot := sq.currentSnapshot()
+ if snapshot != nil {
+ for _, pw := range snapshot.parts {
+ if pw.mp == nil && pw.p != nil &&
pw.p.partMetadata.TotalCount > 0 {
+ partCount++
+ totalSizeBytes +=
int64(pw.p.partMetadata.CompressedSizeBytes)
+ }
+ }
+ snapshot.decRef()
+ }
+ }
+ }
+ return partCount, totalSizeBytes, nil
+}
+
func (sr *schemaRepo) loadQueue(groupName string) (*wqueue.Queue[*tsTable,
option], error) {
g, ok := sr.LoadGroup(groupName)
if !ok {
diff --git a/banyand/trace/metrics.go b/banyand/trace/metrics.go
index 433a488d..d3c520af 100644
--- a/banyand/trace/metrics.go
+++ b/banyand/trace/metrics.go
@@ -31,7 +31,6 @@ var (
)
type metrics struct {
- tbMetrics
indexMetrics *inverted.Metrics
totalWritten meter.Counter
totalBatch meter.Counter
@@ -65,6 +64,8 @@ type metrics struct {
totalMergedParts meter.Counter
totalMergeLatency meter.Counter
totalMerged meter.Counter
+
+ tbMetrics
}
func (tst *tsTable) incTotalWritten(delta int) {
@@ -249,6 +250,18 @@ func (tst *tsTable) incTotalMerged(delta int, typ string) {
tst.metrics.totalMerged.Inc(float64(delta), typ)
}
+func (tst *tsTable) addPendingDataCount(delta int64) {
+ tst.pendingDataCount.Add(delta)
+ if tst.metrics == nil {
+ return
+ }
+ tst.metrics.tbMetrics.pendingDataCount.Add(float64(delta),
tst.p.ShardLabelValues()...)
+}
+
+func (tst *tsTable) getPendingDataCount() int64 {
+ return tst.pendingDataCount.Load()
+}
+
func (m *metrics) DeleteAll() {
if m == nil {
return
@@ -333,6 +346,7 @@ func (s *supplier) newMetrics(p common.Position)
storage.Metrics {
totalFileBlocks:
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
totalFilePartBytes:
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
totalFilePartUncompressedBytes:
factory.NewGauge("total_file_part_uncompressed_bytes",
common.ShardLabelNames()...),
+ pendingDataCount:
factory.NewGauge("pending_data_count", common.ShardLabelNames()...),
},
indexMetrics: inverted.NewMetrics(factory,
common.SegLabelNames()...),
}
@@ -378,6 +392,7 @@ func (qs *queueSupplier) newMetrics(p common.Position)
storage.Metrics {
totalFileBlocks:
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
totalFilePartBytes:
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
totalFilePartUncompressedBytes:
factory.NewGauge("total_file_part_uncompressed_bytes",
common.ShardLabelNames()...),
+ pendingDataCount:
factory.NewGauge("pending_data_count", common.ShardLabelNames()...),
},
indexMetrics: inverted.NewMetrics(factory,
common.SegLabelNames()...),
}
@@ -438,6 +453,7 @@ func (tst *tsTable) deleteMetrics() {
tst.metrics.tbMetrics.totalFileBlocks.Delete(tst.p.ShardLabelValues()...)
tst.metrics.tbMetrics.totalFilePartBytes.Delete(tst.p.ShardLabelValues()...)
tst.metrics.tbMetrics.totalFilePartUncompressedBytes.Delete(tst.p.ShardLabelValues()...)
+
tst.metrics.tbMetrics.pendingDataCount.Delete(tst.p.ShardLabelValues()...)
tst.metrics.indexMetrics.DeleteAll(tst.p.SegLabelValues()...)
}
@@ -453,4 +469,6 @@ type tbMetrics struct {
totalFileBlocks meter.Gauge
totalFilePartBytes meter.Gauge
totalFilePartUncompressedBytes meter.Gauge
+
+ pendingDataCount meter.Gauge
}
diff --git a/banyand/trace/svc_liaison.go b/banyand/trace/svc_liaison.go
index 2e7874e9..2ed967c1 100644
--- a/banyand/trace/svc_liaison.go
+++ b/banyand/trace/svc_liaison.go
@@ -86,6 +86,23 @@ func NewLiaison(metadata metadata.Repo, pipeline
queue.Server, omr observability
}, nil
}
+type collectLiaisonInfoListener struct {
+ *bus.UnImplementedHealthyListener
+ l *liaison
+}
+
+func (cl *collectLiaisonInfoListener) Rev(ctx context.Context, message
bus.Message) bus.Message {
+ req, ok :=
message.Data().(*databasev1.GroupRegistryServiceInspectRequest)
+ if !ok {
+ return bus.NewMessage(message.ID(), common.NewError("invalid
data type for collect liaison info request"))
+ }
+ liaisonInfo, collectErr := cl.l.CollectLiaisonInfo(ctx, req.Group)
+ if collectErr != nil {
+ return bus.NewMessage(message.ID(), common.NewError("failed to
collect liaison info: %v", collectErr))
+ }
+ return bus.NewMessage(message.ID(), liaisonInfo)
+}
+
// LiaisonService returns a new liaison service (deprecated - use NewLiaison).
func LiaisonService(_ context.Context) (Service, error) {
return &liaison{}, nil
@@ -251,6 +268,15 @@ func (l *liaison) PreRun(ctx context.Context) error {
Str("dataPath", l.dataPath).
Msg("trace liaison service initialized")
+ if metaSvc, ok := l.metadata.(metadata.Service); ok {
+
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_TRACE, l)
+ }
+
+ collectLiaisonInfoListener := &collectLiaisonInfoListener{l: l}
+ if subscribeErr :=
l.pipeline.Subscribe(data.TopicTraceCollectLiaisonInfo,
collectLiaisonInfoListener); subscribeErr != nil {
+ return fmt.Errorf("failed to subscribe to collect liaison info
topic: %w", subscribeErr)
+ }
+
return l.pipeline.Subscribe(data.TopicTraceWrite, l.writeListener)
}
@@ -286,6 +312,38 @@ func (l *liaison) GetRemovalSegmentsTimeRange(group
string) *timestamp.TimeRange
return l.schemaRepo.GetRemovalSegmentsTimeRange(group)
}
+func (l *liaison) CollectDataInfo(_ context.Context, _ string)
(*databasev1.DataInfo, error) {
+ return nil, errors.New("collect data info is not supported on liaison
node")
+}
+
+// CollectLiaisonInfo collects liaison node info.
+func (l *liaison) CollectLiaisonInfo(_ context.Context, group string)
(*databasev1.LiaisonInfo, error) {
+ info := &databasev1.LiaisonInfo{
+ PendingWriteDataCount: 0,
+ PendingSyncPartCount: 0,
+ PendingSyncDataSizeBytes: 0,
+ PendingHandoffPartCount: 0,
+ PendingHandoffDataSizeBytes: 0,
+ }
+ pendingWriteCount, writeErr :=
l.schemaRepo.collectPendingWriteInfo(group)
+ if writeErr != nil {
+ return nil, fmt.Errorf("failed to collect pending write info:
%w", writeErr)
+ }
+ info.PendingWriteDataCount = pendingWriteCount
+ pendingSyncPartCount, pendingSyncDataSizeBytes, syncErr :=
l.schemaRepo.collectPendingSyncInfo(group)
+ if syncErr != nil {
+ return nil, fmt.Errorf("failed to collect pending sync info:
%w", syncErr)
+ }
+ info.PendingSyncPartCount = pendingSyncPartCount
+ info.PendingSyncDataSizeBytes = pendingSyncDataSizeBytes
+ if l.handoffCtrl != nil {
+ partCount, totalSize := l.handoffCtrl.stats()
+ info.PendingHandoffPartCount = partCount
+ info.PendingHandoffDataSizeBytes = totalSize
+ }
+ return info, nil
+}
+
// SetMetadata sets the metadata repository.
func (l *liaison) SetMetadata(metadata metadata.Repo) {
l.metadata = metadata
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index ec893bbf..fceaf43d 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -150,7 +150,15 @@ func (s *standalone) PreRun(ctx context.Context) error {
if !strings.HasPrefix(filepath.VolumeName(s.dataPath),
filepath.VolumeName(path)) {
obsservice.UpdatePath(s.dataPath)
}
- s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels)
+ s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels, node.NodeID)
+ if metaSvc, ok := s.metadata.(metadata.Service); ok {
+ metaSvc.RegisterDataCollector(commonv1.Catalog_CATALOG_TRACE,
&s.schemaRepo)
+
metaSvc.RegisterLiaisonCollector(commonv1.Catalog_CATALOG_TRACE, s)
+ }
+ subErr := s.pipeline.Subscribe(data.TopicTraceCollectDataInfo,
&collectDataInfoListener{s: s})
+ if subErr != nil {
+ return fmt.Errorf("failed to subscribe to
TopicTraceCollectDataInfo: %w", subErr)
+ }
// Initialize snapshot directory
s.snapshotDir = filepath.Join(path, "snapshots")
@@ -505,6 +513,37 @@ func (s *standaloneDeleteTraceSegmentsListener) Rev(_
context.Context, message b
return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), deleted)
}
+func (s *standalone) CollectDataInfo(ctx context.Context, group string)
(*databasev1.DataInfo, error) {
+ return s.schemaRepo.CollectDataInfo(ctx, group)
+}
+
+func (s *standalone) CollectLiaisonInfo(_ context.Context, group string)
(*databasev1.LiaisonInfo, error) {
+ info := &databasev1.LiaisonInfo{}
+ pendingWriteCount, writeErr :=
s.schemaRepo.collectPendingWriteInfo(group)
+ if writeErr != nil {
+ return nil, fmt.Errorf("failed to collect pending write info:
%w", writeErr)
+ }
+ info.PendingWriteDataCount = pendingWriteCount
+ return info, nil
+}
+
+type collectDataInfoListener struct {
+ *bus.UnImplementedHealthyListener
+ s *standalone
+}
+
+func (l *collectDataInfoListener) Rev(ctx context.Context, message
bus.Message) bus.Message {
+ req, ok :=
message.Data().(*databasev1.GroupRegistryServiceInspectRequest)
+ if !ok {
+ return bus.NewMessage(message.ID(), common.NewError("invalid
data type for collect data info request"))
+ }
+ dataInfo, collectErr := l.s.schemaRepo.CollectDataInfo(ctx, req.Group)
+ if collectErr != nil {
+ return bus.NewMessage(message.ID(), common.NewError("failed to
collect data info: %v", collectErr))
+ }
+ return bus.NewMessage(message.ID(), dataInfo)
+}
+
// NewService returns a new service.
func NewService(metadata metadata.Repo, pipeline queue.Server, omr
observability.MetricsRegistry, pm protector.Memory) (Service, error) {
return &standalone{
diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go
index 4b16a4f0..53ff3959 100644
--- a/banyand/trace/trace.go
+++ b/banyand/trace/trace.go
@@ -66,6 +66,8 @@ type Service interface {
run.Config
run.Service
Query
+ CollectDataInfo(context.Context, string) (*databasev1.DataInfo, error)
+ CollectLiaisonInfo(context.Context, string) (*databasev1.LiaisonInfo,
error)
}
// Query allows retrieving traces.
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 6d168568..cc47bae7 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -48,22 +48,23 @@ const (
)
type tsTable struct {
- pm protector.Memory
- fileSystem fs.FileSystem
- handoffCtrl *handoffController
- metrics *metrics
- snapshot *snapshot
- loopCloser *run.Closer
- getNodes func() []string
- l *logger.Logger
- sidxMap map[string]sidx.SIDX
- introductions chan *introduction
- p common.Position
- root string
- group string
- gc garbageCleaner
- option option
- curPartID uint64
+ pm protector.Memory
+ fileSystem fs.FileSystem
+ handoffCtrl *handoffController
+ metrics *metrics
+ snapshot *snapshot
+ loopCloser *run.Closer
+ getNodes func() []string
+ l *logger.Logger
+ sidxMap map[string]sidx.SIDX
+ introductions chan *introduction
+ p common.Position
+ root string
+ group string
+ gc garbageCleaner
+ option option
+ curPartID uint64
+ pendingDataCount atomic.Int64
sync.RWMutex
shardID common.ShardID
}
@@ -470,9 +471,11 @@ func (tst *tsTable) mustAddMemPart(mp *memPart,
sidxReqsMap map[string]*sidx.Mem
ind.sidxReqsMap = sidxReqsMap
startTime := time.Now()
totalCount := mp.partMetadata.TotalCount
+ tst.addPendingDataCount(int64(totalCount))
select {
case tst.introductions <- ind:
case <-tst.loopCloser.CloseNotify():
+ tst.addPendingDataCount(-int64(totalCount))
ind.memPart.decRef()
return
}
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index a3c19c2c..082de98e 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -55,6 +55,9 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
}
tire1Client := pub.New(metaSvc, databasev1.Role_ROLE_LIAISON)
tire2Client := pub.New(metaSvc, databasev1.Role_ROLE_DATA)
+ metaSvc.SetDataBroadcaster(tire2Client)
+ metaSvc.SetLiaisonBroadcaster(tire1Client)
+
localPipeline := queue.Local()
measureLiaisonNodeSel :=
node.NewRoundRobinSelector(data.TopicMeasureWrite.String(), metaSvc)
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 6e77f821..2c6372e9 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -828,6 +828,7 @@ type Store interface {
CollectMetrics(...string)
Reset()
TakeFileSnapshot(dst string) error
+ Stats() (dataCount int64, dataSizeBytes int64)
}
// Series represents a series in an index.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 1759d6e2..38779a8c 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -512,6 +512,21 @@ func (s *store) TakeFileSnapshot(dst string) error {
return reader.Backup(dst, nil)
}
+// Stats returns the index statistics including document count and disk size.
+func (s *store) Stats() (dataCount int64, dataSizeBytes int64) {
+ reader, err := s.writer.Reader()
+ if err != nil {
+ return 0, 0
+ }
+ defer reader.Close()
+ count, countErr := reader.Count()
+ if countErr != nil {
+ return 0, 0
+ }
+ status := s.writer.Status()
+ return int64(count), int64(status.CurOnDiskBytes)
+}
+
type blugeMatchIterator struct {
delegated search.DocumentMatchIterator
err error
diff --git a/test/integration/distributed/inspect/inspect_suite_test.go
b/test/integration/distributed/inspect/inspect_suite_test.go
new file mode 100644
index 00000000..bee00eaa
--- /dev/null
+++ b/test/integration/distributed/inspect/inspect_suite_test.go
@@ -0,0 +1,613 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package integration_inspect_test provides integration tests for the inspect
functionality in distributed mode.
+package integration_inspect_test
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "strconv"
+ "strings"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+func TestInspect(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Distributed Inspect Suite")
+}
+
+var (
+ deferFunc func()
+ goods []gleak.Goroutine
+ connection *grpc.ClientConn
+ groupClient databasev1.GroupRegistryServiceClient
+ measureRegClient databasev1.MeasureRegistryServiceClient
+ streamRegClient databasev1.StreamRegistryServiceClient
+ traceRegClient databasev1.TraceRegistryServiceClient
+ measureWriteClient measurev1.MeasureServiceClient
+ streamWriteClient streamv1.StreamServiceClient
+ traceWriteClient tracev1.TraceServiceClient
+ etcdEndpoint string
+ liaisonGrpcAddr string
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })).To(Succeed())
+ pool.EnableStackTracking(true)
+ goods = gleak.Goroutines()
+
+ By("Starting etcd server")
+ ports, err := test.AllocateFreePorts(2)
+ Expect(err).NotTo(HaveOccurred())
+ dir, spaceDef, err := test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0])
+ server, err := embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener([]string{ep},
[]string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}),
+ embeddedetcd.RootDir(dir),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
+ Expect(err).ShouldNot(HaveOccurred())
+ <-server.ReadyNotify()
+
+ By("Starting data node 0")
+ closeDataNode0 := setup.DataNode(ep)
+ By("Starting data node 1")
+ closeDataNode1 := setup.DataNode(ep)
+ By("Starting liaison node")
+ liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep)
+
+ deferFunc = func() {
+ closerLiaisonNode()
+ closeDataNode0()
+ closeDataNode1()
+ _ = server.Close()
+ <-server.StopNotify()
+ spaceDef()
+ }
+
+ return []byte(fmt.Sprintf("%s,%s", liaisonAddr, ep))
+}, func(address []byte) {
+ parts := strings.Split(string(address), ",")
+ if len(parts) != 2 {
+ panic(fmt.Sprintf("expected 2 parts, got %d", len(parts)))
+ }
+ liaisonGrpcAddr = parts[0]
+ etcdEndpoint = parts[1]
+
+ var err error
+ connection, err = grpchelper.Conn(liaisonGrpcAddr, 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials()))
+ Expect(err).NotTo(HaveOccurred())
+
+ groupClient = databasev1.NewGroupRegistryServiceClient(connection)
+ measureRegClient =
databasev1.NewMeasureRegistryServiceClient(connection)
+ streamRegClient = databasev1.NewStreamRegistryServiceClient(connection)
+ traceRegClient = databasev1.NewTraceRegistryServiceClient(connection)
+ measureWriteClient = measurev1.NewMeasureServiceClient(connection)
+ streamWriteClient = streamv1.NewStreamServiceClient(connection)
+ traceWriteClient = tracev1.NewTraceServiceClient(connection)
+})
+
+var _ = SynchronizedAfterSuite(func() {
+ if connection != nil {
+ Expect(connection.Close()).To(Succeed())
+ }
+}, func() {})
+
+var _ = ReportAfterSuite("Distributed Inspect Suite", func(report Report) {
+ if report.SuiteSucceeded {
+ if deferFunc != nil {
+ deferFunc()
+ }
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+ }
+})
+
+func writeMeasureData(ctx context.Context, groupName, measureName string,
dataCount int) {
+ writeClient, writeErr := measureWriteClient.Write(ctx)
+ Expect(writeErr).ShouldNot(HaveOccurred())
+
+ metadata := &commonv1.Metadata{
+ Name: measureName,
+ Group: groupName,
+ }
+ baseTime := time.Now().Truncate(time.Millisecond)
+ for idx := 0; idx < dataCount; idx++ {
+ req := &measurev1.WriteRequest{
+ Metadata: metadata,
+ DataPoint: &measurev1.DataPointValue{
+ Timestamp:
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second)),
+ TagFamilies: []*modelv1.TagFamilyForWrite{{
+ Tags: []*modelv1.TagValue{{
+ Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "id_" + strconv.Itoa(idx)}},
+ }},
+ }},
+ Fields: []*modelv1.FieldValue{{
+ Value: &modelv1.FieldValue_Int{Int:
&modelv1.Int{Value: int64(idx * 100)}},
+ }},
+ },
+ MessageId: uint64(time.Now().UnixNano()),
+ }
+ sendErr := writeClient.Send(req)
+ Expect(sendErr).ShouldNot(HaveOccurred())
+ }
+ Expect(writeClient.CloseSend()).To(Succeed())
+ Eventually(func() error {
+ _, recvErr := writeClient.Recv()
+ return recvErr
+ }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+}
+
+func writeStreamData(ctx context.Context, groupName, streamName string,
dataCount int) {
+ writeClient, writeErr := streamWriteClient.Write(ctx)
+ Expect(writeErr).ShouldNot(HaveOccurred())
+
+ metadata := &commonv1.Metadata{
+ Name: streamName,
+ Group: groupName,
+ }
+ baseTime := time.Now().Truncate(time.Millisecond)
+ for idx := 0; idx < dataCount; idx++ {
+ req := &streamv1.WriteRequest{
+ Metadata: metadata,
+ Element: &streamv1.ElementValue{
+ ElementId: strconv.Itoa(idx),
+ Timestamp:
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second)),
+ TagFamilies: []*modelv1.TagFamilyForWrite{{
+ Tags: []*modelv1.TagValue{{
+ Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_" + strconv.Itoa(idx)}},
+ }},
+ }},
+ },
+ MessageId: uint64(time.Now().UnixNano()),
+ }
+ sendErr := writeClient.Send(req)
+ Expect(sendErr).ShouldNot(HaveOccurred())
+ }
+ Expect(writeClient.CloseSend()).To(Succeed())
+ Eventually(func() error {
+ _, recvErr := writeClient.Recv()
+ return recvErr
+ }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+}
+
+func writeTraceData(ctx context.Context, groupName, traceName string,
dataCount int) {
+ writeClient, writeErr := traceWriteClient.Write(ctx)
+ Expect(writeErr).ShouldNot(HaveOccurred())
+
+ metadata := &commonv1.Metadata{
+ Name: traceName,
+ Group: groupName,
+ }
+ baseTime := time.Now().Truncate(time.Millisecond)
+ for idx := 0; idx < dataCount; idx++ {
+ traceID := fmt.Sprintf("trace_%d", idx)
+ spanID := fmt.Sprintf("span_%d", idx)
+ req := &tracev1.WriteRequest{
+ Metadata: metadata,
+ Tags: []*modelv1.TagValue{
+ {Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: traceID}}},
+ {Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: spanID}}},
+ {Value: &modelv1.TagValue_Timestamp{Timestamp:
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second))}},
+ {Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "test_service"}}},
+ {Value: &modelv1.TagValue_Int{Int:
&modelv1.Int{Value: int64(idx * 10)}}},
+ },
+ Span: []byte(fmt.Sprintf("span_data_%d", idx)),
+ Version: uint64(idx + 1),
+ }
+ sendErr := writeClient.Send(req)
+ Expect(sendErr).ShouldNot(HaveOccurred())
+ }
+ Expect(writeClient.CloseSend()).To(Succeed())
+ Eventually(func() error {
+ _, recvErr := writeClient.Recv()
+ return recvErr
+ }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+}
+
+var _ = Describe("Inspect in distributed mode", func() {
+ var groupName string
+ var measureName string
+ var ctx context.Context
+ const dataCount = 100
+
+ BeforeEach(func() {
+ ctx = context.TODO()
+ groupName = fmt.Sprintf("inspect-test-%d",
time.Now().UnixNano())
+ measureName = "test_measure"
+
+ By("Creating measure group")
+ _, createErr := groupClient.Create(ctx,
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: groupName,
+ },
+ Catalog: commonv1.Catalog_CATALOG_MEASURE,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ },
+ },
+ })
+ Expect(createErr).ShouldNot(HaveOccurred())
+
+ By("Creating measure schema")
+ _, measureErr := measureRegClient.Create(ctx,
&databasev1.MeasureRegistryServiceCreateRequest{
+ Measure: &databasev1.Measure{
+ Metadata: &commonv1.Metadata{
+ Name: measureName,
+ Group: groupName,
+ },
+ Entity: &databasev1.Entity{
+ TagNames: []string{"id"},
+ },
+ TagFamilies: []*databasev1.TagFamilySpec{{
+ Name: "default",
+ Tags: []*databasev1.TagSpec{{
+ Name: "id",
+ Type:
databasev1.TagType_TAG_TYPE_STRING,
+ }},
+ }},
+ Fields: []*databasev1.FieldSpec{{
+ Name: "value",
+ FieldType:
databasev1.FieldType_FIELD_TYPE_INT,
+ EncodingMethod:
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+ CompressionMethod:
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+ }},
+ },
+ })
+ Expect(measureErr).ShouldNot(HaveOccurred())
+ time.Sleep(2 * time.Second)
+
+ By("Writing measure data")
+ writeMeasureData(ctx, groupName, measureName, dataCount)
+ time.Sleep(5 * time.Second)
+ })
+
+ AfterEach(func() {
+ _, _ = groupClient.Delete(ctx,
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+ })
+
+ It("should return schema info", func() {
+ By("Inspecting group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+
+ By("Verifying schema info contains the measure")
+ Expect(resp.SchemaInfo).NotTo(BeNil())
+
Expect(len(resp.SchemaInfo.Measures)).Should(BeNumerically(">=", 1))
+ Expect(resp.SchemaInfo.Measures).To(ContainElement(measureName))
+ GinkgoWriter.Printf("Schema info: measures=%d, streams=%d,
traces=%d, indexRules=%d\n",
+ len(resp.SchemaInfo.Measures),
+ len(resp.SchemaInfo.Streams),
+ len(resp.SchemaInfo.Traces),
+ len(resp.SchemaInfo.IndexRules))
+ })
+
+ It("should return data info", func() {
+ By("Inspecting group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+
+ By("Verifying data collected from multiple nodes")
+ Expect(len(resp.DataInfo)).Should(BeNumerically(">=", 1),
"should collect from at least one data node")
+
+ var totalDataSize int64
+ for idx, dataInfo := range resp.DataInfo {
+ Expect(dataInfo.Node).NotTo(BeNil(), "data node %d
should have node info", idx)
+
Expect(dataInfo.DataSizeBytes).Should(BeNumerically(">", 0), "data node %d (%s)
should have DataSizeBytes > 0", idx, dataInfo.Node.Metadata.Name)
+ totalDataSize += dataInfo.DataSizeBytes
+ GinkgoWriter.Printf("Data node %d: %s, segments: %d,
size: %d bytes\n",
+ idx, dataInfo.Node.Metadata.Name,
len(dataInfo.SegmentInfo), dataInfo.DataSizeBytes)
+ }
+ Expect(totalDataSize).Should(BeNumerically(">", 0), "total data
size should be > 0")
+ GinkgoWriter.Printf("Total data size across all nodes: %d
bytes\n", totalDataSize)
+ })
+
+ It("should return liaison info", func() {
+ By("Inspecting group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+
+ By("Verifying liaison info collected")
+ Expect(len(resp.LiaisonInfo)).Should(BeNumerically(">=", 1),
"should collect from at least one liaison node")
+
+ for idx, liaisonInfo := range resp.LiaisonInfo {
+ GinkgoWriter.Printf("Liaison node %d: PendingWrite=%d,
PendingSync=%d parts (%d bytes)\n",
+ idx,
+ liaisonInfo.PendingWriteDataCount,
+ liaisonInfo.PendingSyncPartCount,
+ liaisonInfo.PendingSyncDataSizeBytes)
+ }
+ })
+})
+
+var _ = Describe("Inspect stream in distributed mode", func() {
+ var groupName string
+ var streamName string
+ var ctx context.Context
+ const dataCount = 100
+
+ BeforeEach(func() {
+ ctx = context.TODO()
+ groupName = fmt.Sprintf("inspect-stream-test-%d",
time.Now().UnixNano())
+ streamName = "test_stream"
+
+ By("Creating stream group")
+ _, createErr := groupClient.Create(ctx,
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: groupName,
+ },
+ Catalog: commonv1.Catalog_CATALOG_STREAM,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ },
+ },
+ })
+ Expect(createErr).ShouldNot(HaveOccurred())
+
+ By("Creating stream schema")
+ _, streamErr := streamRegClient.Create(ctx,
&databasev1.StreamRegistryServiceCreateRequest{
+ Stream: &databasev1.Stream{
+ Metadata: &commonv1.Metadata{
+ Name: streamName,
+ Group: groupName,
+ },
+ Entity: &databasev1.Entity{
+ TagNames: []string{"svc"},
+ },
+ TagFamilies: []*databasev1.TagFamilySpec{{
+ Name: "default",
+ Tags: []*databasev1.TagSpec{{
+ Name: "svc",
+ Type:
databasev1.TagType_TAG_TYPE_STRING,
+ }},
+ }},
+ },
+ })
+ Expect(streamErr).ShouldNot(HaveOccurred())
+ time.Sleep(2 * time.Second)
+
+ By("Writing stream data")
+ writeStreamData(ctx, groupName, streamName, dataCount)
+ time.Sleep(5 * time.Second)
+ })
+
+ AfterEach(func() {
+ _, _ = groupClient.Delete(ctx,
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+ })
+
+ It("should return schema info", func() {
+ By("Inspecting stream group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+
+ By("Verifying schema info contains the stream")
+ Expect(resp.SchemaInfo).NotTo(BeNil())
+ Expect(len(resp.SchemaInfo.Streams)).Should(BeNumerically(">=",
1))
+ Expect(resp.SchemaInfo.Streams).To(ContainElement(streamName))
+ GinkgoWriter.Printf("Stream schema info: streams=%d,
indexRules=%d\n",
+ len(resp.SchemaInfo.Streams),
+ len(resp.SchemaInfo.IndexRules))
+ })
+
+ It("should return data info", func() {
+ By("Inspecting stream group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+
+ By("Verifying stream data collected from multiple nodes")
+ Expect(len(resp.DataInfo)).Should(BeNumerically(">=", 1),
"should collect from at least one stream data node")
+
+ var totalDataSize int64
+ for idx, dataInfo := range resp.DataInfo {
+ Expect(dataInfo.Node).NotTo(BeNil(), "stream data node
%d should have node info", idx)
+
Expect(dataInfo.DataSizeBytes).Should(BeNumerically(">", 0), "stream data node
%d (%s) should have DataSizeBytes > 0", idx, dataInfo.Node.Metadata.Name)
+ totalDataSize += dataInfo.DataSizeBytes
+ GinkgoWriter.Printf("Stream data node %d: %s, segments:
%d, size: %d bytes\n",
+ idx, dataInfo.Node.Metadata.Name,
len(dataInfo.SegmentInfo), dataInfo.DataSizeBytes)
+ }
+ Expect(totalDataSize).Should(BeNumerically(">", 0), "total
stream data size should be > 0")
+ GinkgoWriter.Printf("Total stream data size: %d bytes\n",
totalDataSize)
+ })
+
+ It("should return liaison info", func() {
+ By("Inspecting stream group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+
+ By("Verifying stream liaison info collected")
+ Expect(len(resp.LiaisonInfo)).Should(BeNumerically(">=", 1),
"should collect from at least one liaison node")
+
+ for idx, liaisonInfo := range resp.LiaisonInfo {
+ logger.Infof("Inspecting stream liaison node %d:
PendingWrite=%d, PendingSync=%d parts",
+ idx, liaisonInfo.PendingWriteDataCount,
liaisonInfo.PendingSyncPartCount)
+ GinkgoWriter.Printf("Stream liaison node %d:
PendingWrite=%d, PendingSync=%d parts\n",
+ idx, liaisonInfo.PendingWriteDataCount,
liaisonInfo.PendingSyncPartCount)
+ }
+ })
+})
+
+var _ = Describe("Inspect trace in distributed mode", func() {
+ var groupName string
+ var traceName string
+ var ctx context.Context
+ const dataCount = 100
+
+ BeforeEach(func() {
+ ctx = context.TODO()
+ groupName = fmt.Sprintf("inspect-trace-test-%d",
time.Now().UnixNano())
+ traceName = "test_trace"
+
+ By("Creating trace group")
+ _, createErr := groupClient.Create(ctx,
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: groupName,
+ },
+ Catalog: commonv1.Catalog_CATALOG_TRACE,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ },
+ },
+ })
+ Expect(createErr).ShouldNot(HaveOccurred())
+
+ By("Creating trace schema")
+ _, traceErr := traceRegClient.Create(ctx,
&databasev1.TraceRegistryServiceCreateRequest{
+ Trace: &databasev1.Trace{
+ Metadata: &commonv1.Metadata{
+ Name: traceName,
+ Group: groupName,
+ },
+ TraceIdTagName: "trace_id",
+ SpanIdTagName: "span_id",
+ TimestampTagName: "timestamp",
+ Tags: []*databasev1.TraceTagSpec{
+ {Name: "trace_id", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ {Name: "span_id", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ {Name: "timestamp", Type:
databasev1.TagType_TAG_TYPE_TIMESTAMP},
+ {Name: "service_id", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ {Name: "duration", Type:
databasev1.TagType_TAG_TYPE_INT},
+ },
+ },
+ })
+ Expect(traceErr).ShouldNot(HaveOccurred())
+ time.Sleep(2 * time.Second)
+
+ By("Writing trace data")
+ writeTraceData(ctx, groupName, traceName, dataCount)
+ time.Sleep(5 * time.Second)
+ })
+
+ AfterEach(func() {
+ _, _ = groupClient.Delete(ctx,
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+ })
+
+ It("should return schema info", func() {
+ By("Inspecting trace group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+
+ By("Verifying schema info contains the trace")
+ Expect(resp.SchemaInfo).NotTo(BeNil())
+ Expect(len(resp.SchemaInfo.Traces)).Should(BeNumerically(">=",
1))
+ Expect(resp.SchemaInfo.Traces).To(ContainElement(traceName))
+ GinkgoWriter.Printf("Trace schema info: traces=%d,
indexRules=%d\n",
+ len(resp.SchemaInfo.Traces),
+ len(resp.SchemaInfo.IndexRules))
+ })
+
+ It("should return data info", func() {
+ By("Inspecting trace group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+
+ By("Verifying trace data collected from multiple nodes")
+ Expect(len(resp.DataInfo)).Should(BeNumerically(">=", 1),
"should collect from at least one trace data node")
+
+ var totalDataSize int64
+ for idx, dataInfo := range resp.DataInfo {
+ Expect(dataInfo.Node).NotTo(BeNil(), "trace data node
%d should have node info", idx)
+
Expect(dataInfo.DataSizeBytes).Should(BeNumerically(">", 0), "trace data node
%d (%s) should have DataSizeBytes > 0", idx, dataInfo.Node.Metadata.Name)
+ totalDataSize += dataInfo.DataSizeBytes
+ GinkgoWriter.Printf("Trace data node %d: %s, segments:
%d, size: %d bytes\n",
+ idx, dataInfo.Node.Metadata.Name,
len(dataInfo.SegmentInfo), dataInfo.DataSizeBytes)
+ }
+ Expect(totalDataSize).Should(BeNumerically(">", 0), "total
trace data size should be > 0")
+ GinkgoWriter.Printf("Total trace data size: %d bytes\n",
totalDataSize)
+ })
+
+ It("should return liaison info", func() {
+ By("Inspecting trace group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+
+ By("Verifying trace liaison info collected")
+ Expect(len(resp.LiaisonInfo)).Should(BeNumerically(">=", 1),
"should collect from at least one liaison node")
+
+ for idx, liaisonInfo := range resp.LiaisonInfo {
+ GinkgoWriter.Printf("Trace liaison node %d:
PendingWrite=%d, PendingSync=%d parts\n",
+ idx, liaisonInfo.PendingWriteDataCount,
liaisonInfo.PendingSyncPartCount)
+ }
+ })
+})
diff --git a/test/integration/standalone/inspect/inspect_suite_test.go
b/test/integration/standalone/inspect/inspect_suite_test.go
new file mode 100644
index 00000000..ef624ab5
--- /dev/null
+++ b/test/integration/standalone/inspect/inspect_suite_test.go
@@ -0,0 +1,558 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package integration_inspect_test provides integration tests for the inspect
functionality in standalone mode.
+package integration_inspect_test
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "strconv"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
+)
+
+func TestInspect(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Standalone Inspect Suite",
Label(integration_standalone.Labels...))
+}
+
+var (
+ deferFunc func()
+ goods []gleak.Goroutine
+ connection *grpc.ClientConn
+ groupClient databasev1.GroupRegistryServiceClient
+ measureRegClient databasev1.MeasureRegistryServiceClient
+ streamRegClient databasev1.StreamRegistryServiceClient
+ traceRegClient databasev1.TraceRegistryServiceClient
+ measureWriteClient measurev1.MeasureServiceClient
+ streamWriteClient streamv1.StreamServiceClient
+ traceWriteClient tracev1.TraceServiceClient
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })).To(Succeed())
+ pool.EnableStackTracking(true)
+ goods = gleak.Goroutines()
+ By("Starting standalone server")
+ addr, _, closeFn := setup.EmptyStandalone()
+ deferFunc = closeFn
+ return []byte(addr)
+}, func(address []byte) {
+ var err error
+ connection, err = grpchelper.Conn(string(address), 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials()))
+ Expect(err).NotTo(HaveOccurred())
+ groupClient = databasev1.NewGroupRegistryServiceClient(connection)
+ measureRegClient =
databasev1.NewMeasureRegistryServiceClient(connection)
+ streamRegClient = databasev1.NewStreamRegistryServiceClient(connection)
+ traceRegClient = databasev1.NewTraceRegistryServiceClient(connection)
+ measureWriteClient = measurev1.NewMeasureServiceClient(connection)
+ streamWriteClient = streamv1.NewStreamServiceClient(connection)
+ traceWriteClient = tracev1.NewTraceServiceClient(connection)
+})
+
+var _ = SynchronizedAfterSuite(func() {
+ if connection != nil {
+ Expect(connection.Close()).To(Succeed())
+ }
+}, func() {
+ if deferFunc != nil {
+ deferFunc()
+ }
+})
+
+var _ = ReportAfterSuite("Standalone Inspect Suite", func(report Report) {
+ if report.SuiteSucceeded {
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
+ }
+})
+
+func writeMeasureData(ctx context.Context, groupName, measureName string,
dataCount int) {
+ writeClient, writeErr := measureWriteClient.Write(ctx)
+ Expect(writeErr).ShouldNot(HaveOccurred())
+
+ metadata := &commonv1.Metadata{
+ Name: measureName,
+ Group: groupName,
+ }
+ baseTime := time.Now().Truncate(time.Millisecond)
+ for idx := 0; idx < dataCount; idx++ {
+ req := &measurev1.WriteRequest{
+ Metadata: metadata,
+ DataPoint: &measurev1.DataPointValue{
+ Timestamp:
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second)),
+ TagFamilies: []*modelv1.TagFamilyForWrite{{
+ Tags: []*modelv1.TagValue{{
+ Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "id_" + strconv.Itoa(idx)}},
+ }},
+ }},
+ Fields: []*modelv1.FieldValue{{
+ Value: &modelv1.FieldValue_Int{Int:
&modelv1.Int{Value: int64(idx * 100)}},
+ }},
+ },
+ MessageId: uint64(time.Now().UnixNano()),
+ }
+ sendErr := writeClient.Send(req)
+ Expect(sendErr).ShouldNot(HaveOccurred())
+ }
+ Expect(writeClient.CloseSend()).To(Succeed())
+ Eventually(func() error {
+ _, recvErr := writeClient.Recv()
+ return recvErr
+ }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+}
+
+func writeStreamData(ctx context.Context, groupName, streamName string,
dataCount int) {
+ writeClient, writeErr := streamWriteClient.Write(ctx)
+ Expect(writeErr).ShouldNot(HaveOccurred())
+
+ metadata := &commonv1.Metadata{
+ Name: streamName,
+ Group: groupName,
+ }
+ baseTime := time.Now().Truncate(time.Millisecond)
+ for idx := 0; idx < dataCount; idx++ {
+ req := &streamv1.WriteRequest{
+ Metadata: metadata,
+ Element: &streamv1.ElementValue{
+ ElementId: strconv.Itoa(idx),
+ Timestamp:
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second)),
+ TagFamilies: []*modelv1.TagFamilyForWrite{{
+ Tags: []*modelv1.TagValue{{
+ Value:
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: "svc_" + strconv.Itoa(idx)}},
+ }},
+ }},
+ },
+ MessageId: uint64(time.Now().UnixNano()),
+ }
+ sendErr := writeClient.Send(req)
+ Expect(sendErr).ShouldNot(HaveOccurred())
+ }
+ Expect(writeClient.CloseSend()).To(Succeed())
+ Eventually(func() error {
+ _, recvErr := writeClient.Recv()
+ return recvErr
+ }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+}
+
+func writeTraceData(ctx context.Context, groupName, traceName string,
dataCount int) {
+ writeClient, writeErr := traceWriteClient.Write(ctx)
+ Expect(writeErr).ShouldNot(HaveOccurred())
+
+ metadata := &commonv1.Metadata{
+ Name: traceName,
+ Group: groupName,
+ }
+ baseTime := time.Now().Truncate(time.Millisecond)
+ for idx := 0; idx < dataCount; idx++ {
+ traceID := fmt.Sprintf("trace_%d", idx)
+ spanID := fmt.Sprintf("span_%d", idx)
+ req := &tracev1.WriteRequest{
+ Metadata: metadata,
+ Tags: []*modelv1.TagValue{
+ {Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: traceID}}},
+ {Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: spanID}}},
+ {Value: &modelv1.TagValue_Timestamp{Timestamp:
timestamppb.New(baseTime.Add(time.Duration(idx) * time.Second))}},
+ {Value: &modelv1.TagValue_Str{Str:
&modelv1.Str{Value: "test_service"}}},
+ {Value: &modelv1.TagValue_Int{Int:
&modelv1.Int{Value: int64(idx * 10)}}},
+ },
+ Span: []byte(fmt.Sprintf("span_data_%d", idx)),
+ Version: uint64(idx + 1),
+ }
+ sendErr := writeClient.Send(req)
+ Expect(sendErr).ShouldNot(HaveOccurred())
+ }
+ Expect(writeClient.CloseSend()).To(Succeed())
+ Eventually(func() error {
+ _, recvErr := writeClient.Recv()
+ return recvErr
+ }, flags.EventuallyTimeout).Should(Equal(io.EOF))
+}
+
+var _ = Describe("Inspect measure in standalone mode", func() {
+ var groupName string
+ var measureName string
+ var ctx context.Context
+ const dataCount = 10
+
+ BeforeEach(func() {
+ ctx = context.TODO()
+ groupName = fmt.Sprintf("inspect-measure-test-%d",
time.Now().UnixNano())
+ measureName = "test_measure"
+
+ By("Creating measure group")
+ _, createErr := groupClient.Create(ctx,
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: groupName,
+ },
+ Catalog: commonv1.Catalog_CATALOG_MEASURE,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ },
+ },
+ })
+ Expect(createErr).ShouldNot(HaveOccurred())
+
+ By("Creating measure schema")
+ _, measureErr := measureRegClient.Create(ctx,
&databasev1.MeasureRegistryServiceCreateRequest{
+ Measure: &databasev1.Measure{
+ Metadata: &commonv1.Metadata{
+ Name: measureName,
+ Group: groupName,
+ },
+ Entity: &databasev1.Entity{
+ TagNames: []string{"id"},
+ },
+ TagFamilies: []*databasev1.TagFamilySpec{{
+ Name: "default",
+ Tags: []*databasev1.TagSpec{{
+ Name: "id",
+ Type:
databasev1.TagType_TAG_TYPE_STRING,
+ }},
+ }},
+ Fields: []*databasev1.FieldSpec{{
+ Name: "value",
+ FieldType:
databasev1.FieldType_FIELD_TYPE_INT,
+ EncodingMethod:
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+ CompressionMethod:
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+ }},
+ },
+ })
+ Expect(measureErr).ShouldNot(HaveOccurred())
+ time.Sleep(2 * time.Second)
+
+ By("Writing measure data")
+ writeMeasureData(ctx, groupName, measureName, dataCount)
+ time.Sleep(2 * time.Second)
+ })
+
+ AfterEach(func() {
+ _, _ = groupClient.Delete(ctx,
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+ })
+
+ It("should return group info", func() {
+ By("Inspecting group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+ By("Verifying group info")
+ Expect(resp.Group).NotTo(BeNil())
+ Expect(resp.Group.Metadata.Name).To(Equal(groupName))
+
Expect(resp.Group.Catalog).To(Equal(commonv1.Catalog_CATALOG_MEASURE))
+ })
+
+ It("should return schema info", func() {
+ By("Inspecting group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+ By("Verifying schema info contains the measure")
+ Expect(resp.SchemaInfo).NotTo(BeNil())
+
Expect(len(resp.SchemaInfo.Measures)).Should(BeNumerically(">=", 1))
+ Expect(resp.SchemaInfo.Measures).To(ContainElement(measureName))
+ GinkgoWriter.Printf("Schema info: measures=%d, streams=%d,
traces=%d, indexRules=%d\n",
+ len(resp.SchemaInfo.Measures),
+ len(resp.SchemaInfo.Streams),
+ len(resp.SchemaInfo.Traces),
+ len(resp.SchemaInfo.IndexRules))
+ })
+
+ It("should return data info", func() {
+ By("Inspecting group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+ By("Verifying data info collected")
+ Expect(len(resp.DataInfo)).Should(BeNumerically(">=", 1),
"should collect from standalone node")
+ var totalDataSize int64
+ for idx, dataInfo := range resp.DataInfo {
+ Expect(dataInfo.Node).NotTo(BeNil(), "data node %d
should have node info", idx)
+
Expect(dataInfo.DataSizeBytes).Should(BeNumerically(">", 0), "data node %d
should have data size > 0", idx)
+ totalDataSize += dataInfo.DataSizeBytes
+ GinkgoWriter.Printf("Data node %d: %s, segments: %d,
size: %d bytes\n",
+ idx, dataInfo.Node.Metadata.Name,
len(dataInfo.SegmentInfo), dataInfo.DataSizeBytes)
+ }
+ Expect(totalDataSize).Should(BeNumerically(">", 0), "total data
size should be > 0")
+ GinkgoWriter.Printf("Total data size: %d bytes\n",
totalDataSize)
+ })
+
+ It("should return liaison info", func() {
+ By("Inspecting group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+ By("Verifying liaison info in standalone mode")
+ Expect(len(resp.LiaisonInfo)).Should(BeNumerically(">=", 1),
"should have liaison info in standalone mode")
+ for idx, liaisonInfo := range resp.LiaisonInfo {
+ GinkgoWriter.Printf("Liaison node %d:
PendingWrite=%d\n", idx, liaisonInfo.PendingWriteDataCount)
+ }
+ })
+})
+
+var _ = Describe("Inspect stream in standalone mode", func() {
+ var groupName string
+ var streamName string
+ var ctx context.Context
+ const dataCount = 10
+
+ BeforeEach(func() {
+ ctx = context.TODO()
+ groupName = fmt.Sprintf("inspect-stream-test-%d",
time.Now().UnixNano())
+ streamName = "test_stream"
+
+ By("Creating stream group")
+ _, createErr := groupClient.Create(ctx,
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: groupName,
+ },
+ Catalog: commonv1.Catalog_CATALOG_STREAM,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ },
+ },
+ })
+ Expect(createErr).ShouldNot(HaveOccurred())
+
+ By("Creating stream schema")
+ _, streamErr := streamRegClient.Create(ctx,
&databasev1.StreamRegistryServiceCreateRequest{
+ Stream: &databasev1.Stream{
+ Metadata: &commonv1.Metadata{
+ Name: streamName,
+ Group: groupName,
+ },
+ Entity: &databasev1.Entity{
+ TagNames: []string{"svc"},
+ },
+ TagFamilies: []*databasev1.TagFamilySpec{{
+ Name: "default",
+ Tags: []*databasev1.TagSpec{{
+ Name: "svc",
+ Type:
databasev1.TagType_TAG_TYPE_STRING,
+ }},
+ }},
+ },
+ })
+ Expect(streamErr).ShouldNot(HaveOccurred())
+ time.Sleep(2 * time.Second)
+
+ By("Writing stream data")
+ writeStreamData(ctx, groupName, streamName, dataCount)
+ time.Sleep(2 * time.Second)
+ })
+
+ AfterEach(func() {
+ _, _ = groupClient.Delete(ctx,
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+ })
+
+ It("should return group info", func() {
+ By("Inspecting stream group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+ By("Verifying group info")
+ Expect(resp.Group).NotTo(BeNil())
+ Expect(resp.Group.Metadata.Name).To(Equal(groupName))
+
Expect(resp.Group.Catalog).To(Equal(commonv1.Catalog_CATALOG_STREAM))
+ })
+
+ It("should return data info", func() {
+ By("Inspecting stream group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+ By("Verifying stream data info collected")
+ Expect(len(resp.DataInfo)).Should(BeNumerically(">=", 1),
"should collect from standalone node")
+ var totalDataSize int64
+ for idx, dataInfo := range resp.DataInfo {
+ Expect(dataInfo.Node).NotTo(BeNil(), "stream data node
%d should have node info", idx)
+
Expect(dataInfo.DataSizeBytes).Should(BeNumerically(">", 0), "stream data node
%d should have data size > 0", idx)
+ totalDataSize += dataInfo.DataSizeBytes
+ GinkgoWriter.Printf("Stream data node %d: %s, segments:
%d, size: %d bytes\n",
+ idx, dataInfo.Node.Metadata.Name,
len(dataInfo.SegmentInfo), dataInfo.DataSizeBytes)
+ }
+ Expect(totalDataSize).Should(BeNumerically(">", 0), "total
stream data size should be > 0")
+ GinkgoWriter.Printf("Total stream data size: %d bytes\n",
totalDataSize)
+ })
+
+ It("should return schema info", func() {
+ By("Inspecting stream group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+ By("Verifying schema info contains the stream")
+ Expect(resp.SchemaInfo).NotTo(BeNil())
+ Expect(len(resp.SchemaInfo.Streams)).Should(BeNumerically(">=",
1))
+ Expect(resp.SchemaInfo.Streams).To(ContainElement(streamName))
+ GinkgoWriter.Printf("Stream schema info: streams=%d,
indexRules=%d\n",
+ len(resp.SchemaInfo.Streams),
+ len(resp.SchemaInfo.IndexRules))
+ })
+})
+
+var _ = Describe("Inspect trace in standalone mode", func() {
+ var groupName string
+ var traceName string
+ var ctx context.Context
+ const dataCount = 10
+
+ BeforeEach(func() {
+ ctx = context.TODO()
+ groupName = fmt.Sprintf("inspect-trace-test-%d",
time.Now().UnixNano())
+ traceName = "test_trace"
+
+ By("Creating trace group")
+ _, createErr := groupClient.Create(ctx,
&databasev1.GroupRegistryServiceCreateRequest{
+ Group: &commonv1.Group{
+ Metadata: &commonv1.Metadata{
+ Name: groupName,
+ },
+ Catalog: commonv1.Catalog_CATALOG_TRACE,
+ ResourceOpts: &commonv1.ResourceOpts{
+ ShardNum: 2,
+ SegmentInterval: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 1,
+ },
+ Ttl: &commonv1.IntervalRule{
+ Unit:
commonv1.IntervalRule_UNIT_DAY,
+ Num: 7,
+ },
+ },
+ },
+ })
+ Expect(createErr).ShouldNot(HaveOccurred())
+
+ By("Creating trace schema")
+ _, traceErr := traceRegClient.Create(ctx,
&databasev1.TraceRegistryServiceCreateRequest{
+ Trace: &databasev1.Trace{
+ Metadata: &commonv1.Metadata{
+ Name: traceName,
+ Group: groupName,
+ },
+ Tags: []*databasev1.TraceTagSpec{
+ {Name: "trace_id", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ {Name: "span_id", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ {Name: "timestamp", Type:
databasev1.TagType_TAG_TYPE_TIMESTAMP},
+ {Name: "service_id", Type:
databasev1.TagType_TAG_TYPE_STRING},
+ {Name: "duration", Type:
databasev1.TagType_TAG_TYPE_INT},
+ },
+ TraceIdTagName: "trace_id",
+ SpanIdTagName: "span_id",
+ TimestampTagName: "timestamp",
+ },
+ })
+ Expect(traceErr).ShouldNot(HaveOccurred())
+ time.Sleep(2 * time.Second)
+
+ By("Writing trace data")
+ writeTraceData(ctx, groupName, traceName, dataCount)
+ time.Sleep(2 * time.Second)
+ })
+
+ AfterEach(func() {
+ _, _ = groupClient.Delete(ctx,
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+ })
+
+ It("should return group info", func() {
+ By("Inspecting trace group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+ By("Verifying group info")
+ Expect(resp.Group).NotTo(BeNil())
+ Expect(resp.Group.Metadata.Name).To(Equal(groupName))
+
Expect(resp.Group.Catalog).To(Equal(commonv1.Catalog_CATALOG_TRACE))
+ })
+
+ It("should return data info", func() {
+ By("Inspecting trace group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+ By("Verifying trace data info collected")
+ Expect(len(resp.DataInfo)).Should(BeNumerically(">=", 1),
"should collect from standalone node")
+ var totalDataSize int64
+ for idx, dataInfo := range resp.DataInfo {
+ Expect(dataInfo.Node).NotTo(BeNil(), "trace data node
%d should have node info", idx)
+
Expect(dataInfo.DataSizeBytes).Should(BeNumerically(">", 0), "trace data node
%d should have data size > 0", idx)
+ totalDataSize += dataInfo.DataSizeBytes
+ GinkgoWriter.Printf("Trace data node %d: %s, segments:
%d, size: %d bytes\n",
+ idx, dataInfo.Node.Metadata.Name,
len(dataInfo.SegmentInfo), dataInfo.DataSizeBytes)
+ }
+ Expect(totalDataSize).Should(BeNumerically(">", 0), "total
trace data size should be > 0")
+ GinkgoWriter.Printf("Total trace data size: %d bytes\n",
totalDataSize)
+ })
+
+ It("should return schema info", func() {
+ By("Inspecting trace group")
+ resp, err := groupClient.Inspect(ctx,
&databasev1.GroupRegistryServiceInspectRequest{Group: groupName})
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp).NotTo(BeNil())
+ By("Verifying schema info contains the trace")
+ Expect(resp.SchemaInfo).NotTo(BeNil())
+ Expect(len(resp.SchemaInfo.Traces)).Should(BeNumerically(">=",
1))
+ Expect(resp.SchemaInfo.Traces).To(ContainElement(traceName))
+ GinkgoWriter.Printf("Trace schema info: traces=%d,
indexRules=%d\n",
+ len(resp.SchemaInfo.Traces),
+ len(resp.SchemaInfo.IndexRules))
+ })
+})