This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch trace/liaison in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 18ee51552506eb350e3b0599ef82c985e05ac393 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Thu Aug 28 19:34:36 2025 +0800 Refactor trace service and enhance schema management - Simplified the OnAddOrUpdate method in the entity repository by removing redundant switch cases for schema types, improving code readability and maintainability. - Updated trace service to utilize the new trace management schema, ensuring consistency across trace-related operations. - Introduced new test data and validation for trace service, enhancing the testing framework and ensuring comprehensive coverage for trace functionalities. - Adjusted various components to align with the new trace schema, including metadata handling and group configurations. --- banyand/liaison/grpc/discovery.go | 47 ++--- banyand/liaison/grpc/trace.go | 4 +- banyand/trace/introducer.go | 8 +- banyand/trace/metadata.go | 149 ++++++++++++++- banyand/trace/svc_standalone.go | 89 ++------- banyand/trace/trace.go | 11 +- banyand/trace/trace_suite_test.go | 4 +- banyand/trace/write_standalone.go | 2 +- pkg/schema/init.go | 39 +++- pkg/test/setup/setup.go | 9 + pkg/test/trace/etcd.go | 109 +++++++++++ pkg/test/trace/testdata/group.json | 21 --- .../trace/testdata/groups/test-trace-group.json | 19 ++ pkg/test/trace/testdata/traces/sw.json | 4 - test/cases/init.go | 4 + test/cases/trace/data/data.go | 206 +++++++++++++++++++++ test/cases/trace/data/input/all.yml | 20 ++ test/cases/trace/data/testdata/sw.json | 177 ++++++++++++++++++ test/cases/trace/data/want/all.yml | 48 +++++ test/cases/trace/trace.go | 46 +++++ .../standalone/query/query_suite_test.go | 5 + 21 files changed, 873 insertions(+), 148 deletions(-) diff --git a/banyand/liaison/grpc/discovery.go b/banyand/liaison/grpc/discovery.go index 751414d3..8df965ad 100644 --- a/banyand/liaison/grpc/discovery.go +++ b/banyand/liaison/grpc/discovery.go @@ -202,26 +202,6 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { var l partition.Locator var id identity var modRevision int64 - switch schemaMetadata.Kind { - case schema.KindMeasure: - measure := schemaMetadata.Spec.(*databasev1.Measure) - modRevision = measure.GetMetadata().GetModRevision() - l = partition.NewEntityLocator(measure.TagFamilies, measure.Entity, modRevision) - id = getID(measure.GetMetadata()) - case schema.KindStream: - stream := schemaMetadata.Spec.(*databasev1.Stream) - modRevision = stream.GetMetadata().GetModRevision() - l = partition.NewEntityLocator(stream.TagFamilies, stream.Entity, modRevision) - id = getID(stream.GetMetadata()) - case schema.KindTrace: - trace := schemaMetadata.Spec.(*databasev1.Trace) - modRevision = trace.GetMetadata().GetModRevision() - // For trace, we don't need entity-based partitioning, just store metadata - l = partition.Locator{ModRevision: modRevision} - id = getID(trace.GetMetadata()) - default: - return - } if le := e.log.Debug(); le.Enabled() { var kind string switch schemaMetadata.Kind { @@ -240,16 +220,20 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { Str("kind", kind). Msg("entity added or updated") } - e.RWMutex.Lock() - defer e.RWMutex.Unlock() - e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators, ModRevision: modRevision} switch schemaMetadata.Kind { case schema.KindMeasure: measure := schemaMetadata.Spec.(*databasev1.Measure) - e.measureMap[id] = measure - delete(e.traceMap, id) // Ensure trace is not stored for measures + modRevision = measure.GetMetadata().GetModRevision() + l = partition.NewEntityLocator(measure.TagFamilies, measure.Entity, modRevision) + id = getID(measure.GetMetadata()) + case schema.KindStream: + stream := schemaMetadata.Spec.(*databasev1.Stream) + modRevision = stream.GetMetadata().GetModRevision() + l = partition.NewEntityLocator(stream.TagFamilies, stream.Entity, modRevision) + id = getID(stream.GetMetadata()) case schema.KindTrace: trace := schemaMetadata.Spec.(*databasev1.Trace) + id = getID(trace.GetMetadata()) e.traceMap[id] = trace // Pre-compute trace ID tag index traceIDTagName := trace.GetTraceIdTagName() @@ -261,10 +245,19 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata schema.Metadata) { } } e.traceIDIndexMap[id] = traceIDIndex - delete(e.measureMap, id) // Ensure measure is not stored for traces + return default: + return + } + + e.RWMutex.Lock() + defer e.RWMutex.Unlock() + e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators, ModRevision: modRevision} + if schemaMetadata.Kind == schema.KindMeasure { + measure := schemaMetadata.Spec.(*databasev1.Measure) + e.measureMap[id] = measure + } else { delete(e.measureMap, id) // Ensure measure is not stored for streams - delete(e.traceMap, id) // Ensure trace is not stored for streams } } diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go index 0d4f9843..4cb106a0 100644 --- a/banyand/liaison/grpc/trace.go +++ b/banyand/liaison/grpc/trace.go @@ -89,11 +89,11 @@ func (s *traceService) validateTimestamp(writeEntity *tracev1.WriteRequest) erro func (s *traceService) validateMetadata(writeEntity *tracev1.WriteRequest) error { if writeEntity.Metadata.ModRevision > 0 { - traceCache, existed := s.entityRepo.getLocator(getID(writeEntity.GetMetadata())) + traceCache, existed := s.entityRepo.getTrace(getID(writeEntity.GetMetadata())) if !existed { return errors.New("trace schema not found") } - if writeEntity.Metadata.ModRevision != traceCache.ModRevision { + if writeEntity.Metadata.ModRevision != traceCache.GetMetadata().GetModRevision() { return errors.New("expired trace schema") } } diff --git a/banyand/trace/introducer.go b/banyand/trace/introducer.go index a67869de..809820c9 100644 --- a/banyand/trace/introducer.go +++ b/banyand/trace/introducer.go @@ -32,7 +32,7 @@ func (i *introduction) reset() { i.applied = nil } -var introductionPool = pool.Register[*introduction]("stream-introduction") +var introductionPool = pool.Register[*introduction]("trace-introduction") func generateIntroduction() *introduction { v := introductionPool.Get() @@ -60,7 +60,7 @@ func (i *flusherIntroduction) reset() { i.applied = nil } -var flusherIntroductionPool = pool.Register[*flusherIntroduction]("stream-flusher-introduction") +var flusherIntroductionPool = pool.Register[*flusherIntroduction]("trace-flusher-introduction") func generateFlusherIntroduction() *flusherIntroduction { v := flusherIntroductionPool.Get() @@ -94,7 +94,7 @@ func (i *mergerIntroduction) reset() { i.creator = 0 } -var mergerIntroductionPool = pool.Register[*mergerIntroduction]("stream-merger-introduction") +var mergerIntroductionPool = pool.Register[*mergerIntroduction]("trace-merger-introduction") func generateMergerIntroduction() *mergerIntroduction { v := mergerIntroductionPool.Get() @@ -122,7 +122,7 @@ func (i *syncIntroduction) reset() { i.applied = nil } -var syncIntroductionPool = pool.Register[*syncIntroduction]("stream-sync-introduction") +var syncIntroductionPool = pool.Register[*syncIntroduction]("trace-sync-introduction") func generateSyncIntroduction() *syncIntroduction { v := syncIntroductionPool.Get() diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go index 0d3c9cf0..b44836bf 100644 --- a/banyand/trace/metadata.go +++ b/banyand/trace/metadata.go @@ -20,10 +20,12 @@ package trace import ( "context" "fmt" + "path" "time" "github.com/pkg/errors" + "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/api/validate" @@ -34,7 +36,9 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/banyand/queue/pub" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -91,7 +95,10 @@ func newLiaisonSchemaRepo(path string, svc *liaison, traceDataNodeRegistry grpc. } func (sr *schemaRepo) start() { - sr.l.Info().Str("path", sr.path).Msg("starting trace metadata repository") + sr.Watcher() + sr.metadata. + RegisterHandler("trace", schema.KindGroup|schema.KindTrace|schema.KindIndexRuleBinding|schema.KindIndexRule, + sr) } func (sr *schemaRepo) Trace(metadata *commonv1.Metadata) (*trace, bool) { @@ -103,8 +110,16 @@ func (sr *schemaRepo) Trace(metadata *commonv1.Metadata) (*trace, bool) { return t, ok } -func (sr *schemaRepo) GetRemovalSegmentsTimeRange(_ string) *timestamp.TimeRange { - panic("not implemented") +func (sr *schemaRepo) GetRemovalSegmentsTimeRange(group string) *timestamp.TimeRange { + g, ok := sr.LoadGroup(group) + if !ok { + return nil + } + db := g.SupplyTSDB() + if db == nil { + return nil + } + return db.(storage.TSDB[*tsTable, option]).GetExpiredSegmentsTimeRange() } func (sr *schemaRepo) OnInit(kinds []schema.Kind) (bool, []int64) { @@ -260,6 +275,15 @@ type supplier struct { } func newSupplier(path string, svc *standalone, nodeLabels map[string]string) *supplier { + if svc.pm == nil { + svc.l.Panic().Msg("CRITICAL: svc.pm is nil in newSupplier") + } + opt := svc.option + opt.protector = svc.pm + + if opt.protector == nil { + svc.l.Panic().Msg("CRITICAL: opt.protector is still nil after assignment") + } return &supplier{ metadata: svc.metadata, omr: svc.omr, @@ -267,7 +291,7 @@ func newSupplier(path string, svc *standalone, nodeLabels map[string]string) *su l: svc.l, nodeLabels: nodeLabels, path: path, - option: svc.option, + option: opt, } } @@ -282,8 +306,64 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.Resourc return s.metadata.TraceRegistry().GetTrace(ctx, md) } -func (s *supplier) OpenDB(_ *commonv1.Group) (resourceSchema.DB, error) { - panic("not implemented") +func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error) { + name := groupSchema.Metadata.Group + p := common.Position{ + Module: "trace", + Database: name, + } + ro := groupSchema.ResourceOpts + if ro == nil { + return nil, fmt.Errorf("no resource opts in group %s", name) + } + shardNum := ro.ShardNum + ttl := ro.Ttl + segInterval := ro.SegmentInterval + segmentIdleTimeout := time.Duration(0) + if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 { + var ttlNum uint32 + for _, st := range ro.Stages { + if st.Ttl.Unit != ro.Ttl.Unit { + return nil, fmt.Errorf("ttl unit %s is not consistent with stage %s", ro.Ttl.Unit, st.Ttl.Unit) + } + selector, err := pub.ParseLabelSelector(st.NodeSelector) + if err != nil { + return nil, errors.WithMessagef(err, "failed to parse node selector %s", st.NodeSelector) + } + ttlNum += st.Ttl.Num + if !selector.Matches(s.nodeLabels) { + continue + } + ttl.Num += ttlNum + shardNum = st.ShardNum + segInterval = st.SegmentInterval + if st.Close { + segmentIdleTimeout = 5 * time.Minute + } + break + } + } + group := groupSchema.Metadata.Group + opts := storage.TSDBOpts[*tsTable, option]{ + ShardNum: shardNum, + Location: path.Join(s.path, group), + TSTableCreator: newTSTable, + TableMetrics: s.newMetrics(p), + SegmentInterval: storage.MustToIntervalRule(segInterval), + TTL: storage.MustToIntervalRule(ttl), + Option: s.option, + SeriesIndexFlushTimeoutSeconds: s.option.flushTimeout.Nanoseconds() / int64(time.Second), + SeriesIndexCacheMaxBytes: int(s.option.seriesCacheMaxSize), + StorageMetricsFactory: s.omr.With(traceScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(), p.DBLabelValues()))), + SegmentIdleTimeout: segmentIdleTimeout, + MemoryLimit: s.pm.GetLimit(), + } + return storage.OpenTSDB( + common.SetPosition(context.Background(), func(_ common.Position) common.Position { + return p + }), + opts, nil, group, + ) } // queueSupplier is the supplier for liaison service. @@ -299,6 +379,15 @@ type queueSupplier struct { } func newQueueSupplier(path string, svc *liaison, traceDataNodeRegistry grpc.NodeRegistry) *queueSupplier { + if svc.pm == nil { + svc.l.Panic().Msg("CRITICAL: svc.pm is nil in newSupplier") + } + opt := svc.option + opt.protector = svc.pm + + if opt.protector == nil { + svc.l.Panic().Msg("CRITICAL: opt.protector is still nil after assignment") + } return &queueSupplier{ metadata: svc.metadata, omr: svc.omr, @@ -306,7 +395,7 @@ func newQueueSupplier(path string, svc *liaison, traceDataNodeRegistry grpc.Node traceDataNodeRegistry: traceDataNodeRegistry, l: svc.l, path: path, - option: svc.option, + option: opt, } } @@ -321,6 +410,48 @@ func (qs *queueSupplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.R return qs.metadata.TraceRegistry().GetTrace(ctx, md) } -func (qs *queueSupplier) OpenDB(_ *commonv1.Group) (resourceSchema.DB, error) { - panic("not implemented") +func (qs *queueSupplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, error) { + name := groupSchema.Metadata.Group + p := common.Position{ + Module: "trace", + Database: name, + } + ro := groupSchema.ResourceOpts + if ro == nil { + return nil, fmt.Errorf("no resource opts in group %s", name) + } + shardNum := ro.ShardNum + group := groupSchema.Metadata.Group + opts := wqueue.Opts[*tsTable, option]{ + Group: group, + ShardNum: shardNum, + SegmentInterval: storage.MustToIntervalRule(ro.SegmentInterval), + Location: path.Join(qs.path, group), + Option: qs.option, + Metrics: qs.newMetrics(p), + SubQueueCreator: newWriteQueue, + GetNodes: func(shardID common.ShardID) []string { + copies := ro.Replicas + 1 + nodeSet := make(map[string]struct{}, copies) + for i := uint32(0); i < copies; i++ { + nodeID, err := qs.traceDataNodeRegistry.Locate(group, "", uint32(shardID), i) + if err != nil { + qs.l.Error().Err(err).Str("group", group).Uint32("shard", uint32(shardID)).Uint32("copy", i).Msg("failed to locate node") + return nil + } + nodeSet[nodeID] = struct{}{} + } + nodes := make([]string, 0, len(nodeSet)) + for nodeID := range nodeSet { + nodes = append(nodes, nodeID) + } + return nodes + }, + } + return wqueue.Open( + common.SetPosition(context.Background(), func(_ common.Position) common.Position { + return p + }), + opts, group, + ) } diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go index fe0e9fc9..05e4e858 100644 --- a/banyand/trace/svc_standalone.go +++ b/banyand/trace/svc_standalone.go @@ -21,12 +21,15 @@ import ( "context" "path" "path/filepath" + "strings" "github.com/pkg/errors" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/protector" @@ -63,11 +66,6 @@ type standalone struct { maxFileSnapshotNum int } -// StandaloneService returns a new standalone service. -func StandaloneService(_ context.Context) (Service, error) { - return &standalone{}, nil -} - func (s *standalone) FlagSet() *run.FlagSet { fs := run.NewFlagSet("trace") fs.StringVar(&s.root, "trace-root-path", "/tmp", "the root path for trace data") @@ -94,37 +92,25 @@ func (s *standalone) Role() databasev1.Role { return databasev1.Role_ROLE_DATA } -func (s *standalone) PreRun(_ context.Context) error { - s.l = logger.GetLogger("trace") - - // Initialize metadata - if s.metadata == nil { - return errors.New("metadata repo is required") - } - - // Initialize filesystem - if s.lfs == nil { - s.lfs = fs.NewLocalFileSystem() - } - - // Initialize protector - if s.pm == nil { - return errors.New("memory protector is required") +func (s *standalone) PreRun(ctx context.Context) error { + s.l = logger.GetLogger(s.Name()) + s.l.Info().Msg("memory protector is initialized in PreRun") + s.lfs = fs.NewLocalFileSystemWithLoggerAndLimit(s.l, s.pm.GetLimit()) + path := path.Join(s.root, s.Name()) + s.snapshotDir = filepath.Join(path, storage.SnapshotsDir) + observability.UpdatePath(path) + val := ctx.Value(common.ContextNodeKey) + if val == nil { + return errors.New("node id is empty") } - - // Initialize pipeline - if s.pipeline == nil { - return errors.New("pipeline is required") - } - - // Set up data path + node := val.(common.Node) if s.dataPath == "" { - s.dataPath = path.Join(s.root, "trace-data") + s.dataPath = filepath.Join(path, storage.DataDir) } - - // Initialize schema repository - var nodeLabels map[string]string - s.schemaRepo = newSchemaRepo(s.dataPath, s, nodeLabels) + if !strings.HasPrefix(filepath.VolumeName(s.dataPath), filepath.VolumeName(path)) { + observability.UpdatePath(s.dataPath) + } + s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels) // Initialize snapshot directory s.snapshotDir = filepath.Join(s.dataPath, "snapshot") @@ -148,12 +134,7 @@ func (s *standalone) PreRun(_ context.Context) error { } func (s *standalone) Serve() run.StopNotify { - // As specified in the plan, no pipeline listeners should be implemented - s.l.Info().Msg("trace standalone service started") - - // Return a channel that never closes since this service runs indefinitely - stopCh := make(chan struct{}) - return stopCh + return s.schemaRepo.StopCh() } func (s *standalone) GracefulStop() { @@ -179,36 +160,6 @@ func (s *standalone) GetRemovalSegmentsTimeRange(group string) *timestamp.TimeRa return s.schemaRepo.GetRemovalSegmentsTimeRange(group) } -// SetMetadata sets the metadata repository. -func (s *standalone) SetMetadata(metadata metadata.Repo) { - s.metadata = metadata -} - -// SetObservabilityRegistry sets the observability metrics registry. -func (s *standalone) SetObservabilityRegistry(omr observability.MetricsRegistry) { - s.omr = omr -} - -// SetProtector sets the memory protector. -func (s *standalone) SetProtector(pm protector.Memory) { - s.pm = pm -} - -// SetPipeline sets the pipeline server. -func (s *standalone) SetPipeline(pipeline queue.Server) { - s.pipeline = pipeline -} - -// SetLocalPipeline sets the local pipeline queue. -func (s *standalone) SetLocalPipeline(localPipeline queue.Queue) { - s.localPipeline = localPipeline -} - -// SetFileSystem sets the file system. -func (s *standalone) SetFileSystem(lfs fs.FileSystem) { - s.lfs = lfs -} - // NewService returns a new service. func NewService(metadata metadata.Repo, pipeline queue.Server, omr observability.MetricsRegistry, pm protector.Memory) (Service, error) { return &standalone{ diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go index b8812ee0..ed33c4f0 100644 --- a/banyand/trace/trace.go +++ b/banyand/trace/trace.go @@ -48,12 +48,11 @@ const ( var traceScope = observability.RootScope.SubScope("trace") type option struct { - mergePolicy *mergePolicy - protector protector.Memory - tire2Client queue.Client - seriesCacheMaxSize run.Bytes - flushTimeout time.Duration - elementIndexFlushTimeout time.Duration + mergePolicy *mergePolicy + protector protector.Memory + tire2Client queue.Client + seriesCacheMaxSize run.Bytes + flushTimeout time.Duration } // Service allows inspecting the trace data. diff --git a/banyand/trace/trace_suite_test.go b/banyand/trace/trace_suite_test.go index b64c502e..05140094 100644 --- a/banyand/trace/trace_suite_test.go +++ b/banyand/trace/trace_suite_test.go @@ -33,7 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/test/flags" - teststream "github.com/apache/skywalking-banyandb/pkg/test/stream" + testtrace "github.com/apache/skywalking-banyandb/pkg/test/trace" ) func TestTrace(t *testing.T) { @@ -57,7 +57,7 @@ func (p *preloadTraceService) Name() string { } func (p *preloadTraceService) PreRun(ctx context.Context) error { - return teststream.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) + return testtrace.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) } type services struct { diff --git a/banyand/trace/write_standalone.go b/banyand/trace/write_standalone.go index 5825983e..5974986c 100644 --- a/banyand/trace/write_standalone.go +++ b/banyand/trace/write_standalone.go @@ -186,7 +186,7 @@ func processTraces(schemaRepo *schemaRepo, traces *traces, writeEvent *tracev1.I } is := stm.indexSchema.Load().(indexSchema) - if len(is.indexRuleLocators) != len(stm.GetSchema().GetTags()) { + if len(is.indexRuleLocators) > len(stm.GetSchema().GetTags()) { return fmt.Errorf("metadata crashed, tag rule length %d, tag length %d", len(is.indexRuleLocators), len(stm.GetSchema().GetTags())) } diff --git a/pkg/schema/init.go b/pkg/schema/init.go index 8cbe0a12..5407b402 100644 --- a/pkg/schema/init.go +++ b/pkg/schema/init.go @@ -33,14 +33,15 @@ type revisionContext struct { group int64 measure int64 stream int64 + trace int64 indexRule int64 indexRuleBinding int64 topNAgg int64 } func (r revisionContext) String() string { - return fmt.Sprintf("Group: %d, Measure: %d, Stream: %d, IndexRule: %d, IndexRuleBinding: %d, TopNAgg: %d", - r.group, r.measure, r.stream, r.indexRule, r.indexRuleBinding, r.topNAgg) + return fmt.Sprintf("Group: %d, Measure: %d, Stream: %d, Trace: %d, IndexRule: %d, IndexRuleBinding: %d, TopNAgg: %d", + r.group, r.measure, r.stream, r.trace, r.indexRule, r.indexRuleBinding, r.topNAgg) } type revisionContextKey struct{} @@ -48,7 +49,7 @@ type revisionContextKey struct{} var revCtxKey = revisionContextKey{} func (sr *schemaRepo) Init(kind schema.Kind) ([]string, []int64) { - if kind != schema.KindMeasure && kind != schema.KindStream { + if kind != schema.KindMeasure && kind != schema.KindStream && kind != schema.KindTrace { return nil, nil } catalog := sr.getCatalog(kind) @@ -74,6 +75,10 @@ func (sr *schemaRepo) Init(kind schema.Kind) ([]string, []int64) { sr.l.Info().Stringer("revision", revCtx).Msg("init measures") return groupNames, []int64{revCtx.group, revCtx.measure, revCtx.indexRuleBinding, revCtx.indexRule, revCtx.topNAgg} } + if kind == schema.KindTrace { + sr.l.Info().Stringer("revision", revCtx).Msg("init trace") + return groupNames, []int64{revCtx.group, revCtx.trace, revCtx.indexRuleBinding, revCtx.indexRule} + } sr.l.Info().Stringer("revision", revCtx).Msg("init stream") return groupNames, []int64{revCtx.group, revCtx.stream, revCtx.indexRuleBinding, revCtx.indexRule} } @@ -82,6 +87,9 @@ func (sr *schemaRepo) getCatalog(kind schema.Kind) commonv1.Catalog { if kind == schema.KindMeasure { return commonv1.Catalog_CATALOG_MEASURE } + if kind == schema.KindTrace { + return commonv1.Catalog_CATALOG_TRACE + } return commonv1.Catalog_CATALOG_STREAM } @@ -96,6 +104,10 @@ func (sr *schemaRepo) processGroup(ctx context.Context, g *commonv1.Group, catal sr.processMeasure(ctx, g.Metadata.Name) return } + if catalog == commonv1.Catalog_CATALOG_TRACE { + sr.processTrace(ctx, g.Metadata.Name) + return + } sr.processStream(ctx, g.Metadata.Name) } @@ -178,6 +190,27 @@ func (sr *schemaRepo) processStream(ctx context.Context, gName string) { sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(ss)).Msg("store streams") } +func (sr *schemaRepo) processTrace(ctx context.Context, gName string) { + ctx, cancel := context.WithTimeout(ctx, initTimeout) + defer cancel() + start := time.Now() + tt, err := sr.metadata.TraceRegistry().ListTrace(ctx, schema.ListOpt{Group: gName}) + if err != nil { + logger.Panicf("fails to get the traces: %v", err) + return + } + revCtx := ctx.Value(revCtxKey).(*revisionContext) + for _, t := range tt { + if err := sr.storeResource(t); err != nil { + logger.Panicf("fails to store the trace: %v", err) + } + if t.Metadata.ModRevision > revCtx.trace { + revCtx.trace = t.Metadata.ModRevision + } + } + sr.l.Info().Str("group", gName).Dur("duration", time.Since(start)).Int("size", len(tt)).Msg("store traces") +} + func (sr *schemaRepo) initGroup(groupSchema *commonv1.Group) (*group, error) { g, ok := sr.getGroup(groupSchema.Metadata.Name) if ok { diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index 55ac3940..b54d74a3 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -42,6 +42,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/test/helpers" test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure" test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream" + test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace" ) const host = "localhost" @@ -51,6 +52,7 @@ func Standalone(flags ...string) (string, string, func()) { return StandaloneWithSchemaLoaders([]SchemaLoader{ &preloadService{name: "stream"}, &preloadService{name: "measure"}, + &preloadService{name: "trace"}, }, "", "", "", "", flags...) } @@ -59,6 +61,7 @@ func StandaloneWithAuth(username, password string, flags ...string) (string, str return StandaloneWithSchemaLoaders([]SchemaLoader{ &preloadService{name: "stream"}, &preloadService{name: "measure"}, + &preloadService{name: "trace"}, }, "", "", username, password, flags...) } @@ -67,6 +70,7 @@ func StandaloneWithTLS(certFile, keyFile string, flags ...string) (string, strin return StandaloneWithSchemaLoaders([]SchemaLoader{ &preloadService{name: "stream"}, &preloadService{name: "measure"}, + &preloadService{name: "trace"}, }, certFile, keyFile, "", "", flags...) } @@ -99,6 +103,7 @@ func ClosableStandalone(path string, ports []int, flags ...string) (string, stri return standaloneServer(path, ports, []SchemaLoader{ &preloadService{name: "stream"}, &preloadService{name: "measure"}, + &preloadService{name: "trace"}, }, "", "", flags...) } @@ -134,6 +139,7 @@ func standaloneServerWithAuth(path string, ports []int, schemaLoaders []SchemaLo "--measure-root-path=" + path, "--metadata-root-path=" + path, "--property-root-path=" + path, + "--trace-root-path=" + path, fmt.Sprintf("--etcd-listen-client-url=%s", endpoint), fmt.Sprintf("--etcd-listen-peer-url=http://%s:%d", host, ports[3]), } tlsEnabled := false @@ -202,6 +208,9 @@ func (p *preloadService) PreRun(ctx context.Context) error { if p.name == "stream" { return test_stream.PreloadSchema(ctx, p.registry) } + if p.name == "trace" { + return test_trace.PreloadSchema(ctx, p.registry) + } return test_measure.PreloadSchema(ctx, p.registry) } diff --git a/pkg/test/trace/etcd.go b/pkg/test/trace/etcd.go new file mode 100644 index 00000000..275c1765 --- /dev/null +++ b/pkg/test/trace/etcd.go @@ -0,0 +1,109 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package trace implements helpers to load schemas for testing. +package trace + +import ( + "context" + "embed" + "path" + + "github.com/pkg/errors" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" +) + +const ( + groupDir = "testdata/groups" + traceDir = "testdata/traces" + indexRuleDir = "testdata/index_rules" + indexRuleBindingDir = "testdata/index_rule_bindings" +) + +//go:embed testdata/* +var store embed.FS + +// PreloadSchema loads schemas from files in the booting process. +func PreloadSchema(ctx context.Context, e schema.Registry) error { + return loadAllSchemas(ctx, e) +} + +// loadAllSchemas loads all trace-related schemas from the testdata directory. +func loadAllSchemas(ctx context.Context, e schema.Registry) error { + return preloadSchemaWithFuncs(ctx, e, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(groupDir, &commonv1.Group{}, func(group *commonv1.Group) error { + return e.CreateGroup(ctx, group) + }) + }, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(traceDir, &databasev1.Trace{}, func(trace *databasev1.Trace) error { + _, innerErr := e.CreateTrace(ctx, trace) + return innerErr + }) + }, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(indexRuleDir, &databasev1.IndexRule{}, func(indexRule *databasev1.IndexRule) error { + return e.CreateIndexRule(ctx, indexRule) + }) + }, + func(ctx context.Context, e schema.Registry) error { + return loadSchema(indexRuleBindingDir, &databasev1.IndexRuleBinding{}, func(indexRuleBinding *databasev1.IndexRuleBinding) error { + return e.CreateIndexRuleBinding(ctx, indexRuleBinding) + }) + }, + ) +} + +// preloadSchemaWithFuncs extracts the common logic for loading schemas. +func preloadSchemaWithFuncs(ctx context.Context, e schema.Registry, loaders ...func(context.Context, schema.Registry) error) error { + for _, loader := range loaders { + if err := loader(ctx, e); err != nil { + return errors.WithStack(err) + } + } + return nil +} + +func loadSchema[T proto.Message](dir string, resource T, loadFn func(resource T) error) error { + entries, err := store.ReadDir(dir) + if err != nil { + return err + } + for _, entry := range entries { + data, err := store.ReadFile(path.Join(dir, entry.Name())) + if err != nil { + return err + } + resource.ProtoReflect().Descriptor().RequiredNumbers() + if err := protojson.Unmarshal(data, resource); err != nil { + return err + } + if err := loadFn(resource); err != nil { + if errors.Is(err, schema.ErrGRPCAlreadyExists) { + return nil + } + return err + } + } + return nil +} diff --git a/pkg/test/trace/testdata/group.json b/pkg/test/trace/testdata/group.json deleted file mode 100644 index b0f5793d..00000000 --- a/pkg/test/trace/testdata/group.json +++ /dev/null @@ -1,21 +0,0 @@ -[ - { - "metadata": { - "name": "test-trace-group" - }, - "catalog": "CATALOG_TRACE", - "resource_opts": { - "shard_num": 2, - "replicas": 1, - "segment_interval": { - "unit": "UNIT_DAY", - "num": 1 - }, - "ttl": { - "unit": "UNIT_DAY", - "num": 3 - } - }, - "updated_at": "2021-04-15T01:30:15.01Z" - } -] \ No newline at end of file diff --git a/pkg/test/trace/testdata/groups/test-trace-group.json b/pkg/test/trace/testdata/groups/test-trace-group.json new file mode 100644 index 00000000..3d7bbaf2 --- /dev/null +++ b/pkg/test/trace/testdata/groups/test-trace-group.json @@ -0,0 +1,19 @@ +{ + "metadata": { + "name": "test-trace-group" + }, + "catalog": "CATALOG_TRACE", + "resource_opts": { + "shard_num": 2, + "replicas": 1, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 3 + } + }, + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/pkg/test/trace/testdata/traces/sw.json b/pkg/test/trace/testdata/traces/sw.json index 346a7389..37e21ed3 100644 --- a/pkg/test/trace/testdata/traces/sw.json +++ b/pkg/test/trace/testdata/traces/sw.json @@ -31,10 +31,6 @@ { "name": "timestamp", "type": "TAG_TYPE_TIMESTAMP" - }, - { - "name": "data_binary", - "type": "TAG_TYPE_DATA_BINARY" } ], "trace_id_tag_name": "trace_id", diff --git a/test/cases/init.go b/test/cases/init.go index 5e7d0543..ad2a71b6 100644 --- a/test/cases/init.go +++ b/test/cases/init.go @@ -28,6 +28,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/grpchelper" casesmeasuredata "github.com/apache/skywalking-banyandb/test/cases/measure/data" casesstreamdata "github.com/apache/skywalking-banyandb/test/cases/stream/data" + casestrace "github.com/apache/skywalking-banyandb/test/cases/trace/data" ) // Initialize test data. @@ -60,4 +61,7 @@ func Initialize(addr string, now time.Time) { casesmeasuredata.Write(conn, "duplicated", "exception", "duplicated.json", now, 0) casesmeasuredata.Write(conn, "service_cpm_minute", "sw_updated", "service_cpm_minute_updated_data.json", now.Add(10*time.Minute), interval) time.Sleep(5 * time.Second) + // trace + interval = 500 * time.Millisecond + casestrace.Write(conn, "sw", now, interval) } diff --git a/test/cases/trace/data/data.go b/test/cases/trace/data/data.go new file mode 100644 index 00000000..fc70b23d --- /dev/null +++ b/test/cases/trace/data/data.go @@ -0,0 +1,206 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package data is used to test the trace service. +package data + +import ( + "context" + "embed" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "slices" + "strings" + "time" + + "github.com/google/go-cmp/cmp" + g "github.com/onsi/ginkgo/v2" + gm "github.com/onsi/gomega" + grpclib "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/timestamppb" + "sigs.k8s.io/yaml" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" +) + +//go:embed input/*.yml +var inputFS embed.FS + +//go:embed want/*.yml +var wantFS embed.FS + +//go:embed testdata/*.json +var dataFS embed.FS + +// VerifyFn verify whether the query response matches the wanted result. +var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, args helpers.Args) { + i, err := inputFS.ReadFile("input/" + args.Input + ".yml") + innerGm.Expect(err).NotTo(gm.HaveOccurred()) + query := &tracev1.QueryRequest{} + helpers.UnmarshalYAML(i, query) + query.TimeRange = helpers.TimeRange(args, sharedContext) + query.Stages = args.Stages + c := tracev1.NewTraceServiceClient(sharedContext.Connection) + ctx := context.Background() + resp, err := c.Query(ctx, query) + if args.WantErr { + if err == nil { + g.Fail("expect error") + } + return + } + innerGm.Expect(err).NotTo(gm.HaveOccurred(), query.String()) + if args.WantEmpty { + innerGm.Expect(resp.Spans).To(gm.BeEmpty()) + return + } + if args.Want == "" { + args.Want = args.Input + } + ww, err := wantFS.ReadFile("want/" + args.Want + ".yml") + innerGm.Expect(err).NotTo(gm.HaveOccurred()) + want := &tracev1.QueryResponse{} + helpers.UnmarshalYAML(ww, want) + if args.DisOrder { + slices.SortFunc(want.Spans, func(a, b *tracev1.Span) int { + // Sort by first tag value for consistency + if len(a.Tags) > 0 && len(b.Tags) > 0 { + return strings.Compare(a.Tags[0].Value.GetStr().GetValue(), b.Tags[0].Value.GetStr().GetValue()) + } + return 0 + }) + slices.SortFunc(resp.Spans, func(a, b *tracev1.Span) int { + if len(a.Tags) > 0 && len(b.Tags) > 0 { + return strings.Compare(a.Tags[0].Value.GetStr().GetValue(), b.Tags[0].Value.GetStr().GetValue()) + } + return 0 + }) + } + var extra []cmp.Option + extra = append(extra, protocmp.IgnoreUnknown(), + protocmp.Transform()) + success := innerGm.Expect(cmp.Equal(resp, want, + extra...)). + To(gm.BeTrue(), func() string { + var j []byte + j, err = protojson.Marshal(resp) + if err != nil { + return err.Error() + } + var y []byte + y, err = yaml.JSONToYAML(j) + if err != nil { + return err.Error() + } + return string(y) + }) + if !success { + return + } + query.Trace = true + resp, err = c.Query(ctx, query) + innerGm.Expect(err).NotTo(gm.HaveOccurred()) + innerGm.Expect(resp.TraceQueryResult).NotTo(gm.BeNil()) + innerGm.Expect(resp.TraceQueryResult.GetSpans()).NotTo(gm.BeEmpty()) +} + +func loadData(stream tracev1.TraceService_WriteClient, metadata *commonv1.Metadata, dataFile string, baseTime time.Time, interval time.Duration) { + var templates []interface{} + content, err := dataFS.ReadFile("testdata/" + dataFile) + gm.Expect(err).ShouldNot(gm.HaveOccurred()) + gm.Expect(json.Unmarshal(content, &templates)).ShouldNot(gm.HaveOccurred()) + + for i, template := range templates { + // Extract span data from template + templateMap, ok := template.(map[string]interface{}) + gm.Expect(ok).To(gm.BeTrue()) + + // Get span data + spanData, ok := templateMap["span"].(string) + gm.Expect(ok).To(gm.BeTrue()) + spanBytes, err := base64.StdEncoding.DecodeString(spanData) + gm.Expect(err).ShouldNot(gm.HaveOccurred()) + + // Get tags data + tagsData, ok := templateMap["tags"].([]interface{}) + gm.Expect(ok).To(gm.BeTrue()) + + // Convert tags to TagValue format + var tagValues []*modelv1.TagValue + for _, tag := range tagsData { + tagBytes, err := json.Marshal(tag) + gm.Expect(err).ShouldNot(gm.HaveOccurred()) + tagValue := &modelv1.TagValue{} + gm.Expect(protojson.Unmarshal(tagBytes, tagValue)).ShouldNot(gm.HaveOccurred()) + tagValues = append(tagValues, tagValue) + } + + // Add timestamp tag as the last tag + timestamp := baseTime.Add(interval * time.Duration(i)) + timestampTag := &modelv1.TagValue{ + Value: &modelv1.TagValue_Timestamp{ + Timestamp: timestamppb.New(timestamp), + }, + } + tagValues = append(tagValues, timestampTag) + + errInner := stream.Send(&tracev1.WriteRequest{ + Metadata: metadata, + Tags: tagValues, + Span: spanBytes, + Version: uint64(i + 1), + }) + gm.Expect(errInner).ShouldNot(gm.HaveOccurred()) + } +} + +// Write writes trace data to the database. +func Write(conn *grpclib.ClientConn, name string, baseTime time.Time, interval time.Duration) { + WriteToGroup(conn, name, "test-trace-group", name, baseTime, interval) +} + +// WriteToGroup writes trace data to a specific group. +func WriteToGroup(conn *grpclib.ClientConn, name, group, fileName string, baseTime time.Time, interval time.Duration) { + metadata := &commonv1.Metadata{ + Name: name, + Group: group, + } + schema := databasev1.NewTraceRegistryServiceClient(conn) + resp, err := schema.Get(context.Background(), &databasev1.TraceRegistryServiceGetRequest{Metadata: metadata}) + gm.Expect(err).NotTo(gm.HaveOccurred()) + metadata = resp.GetTrace().GetMetadata() + + c := tracev1.NewTraceServiceClient(conn) + ctx := context.Background() + writeClient, err := c.Write(ctx) + gm.Expect(err).NotTo(gm.HaveOccurred()) + loadData(writeClient, metadata, fmt.Sprintf("%s.json", fileName), baseTime, interval) + gm.Expect(writeClient.CloseSend()).To(gm.Succeed()) + gm.Eventually(func() error { + _, err := writeClient.Recv() + return err + }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF)) +} diff --git a/test/cases/trace/data/input/all.yml b/test/cases/trace/data/input/all.yml new file mode 100644 index 00000000..8a295c06 --- /dev/null +++ b/test/cases/trace/data/input/all.yml @@ -0,0 +1,20 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "sw" +groups: ["test-trace-group"] +tag_projection: ["trace_id"] diff --git a/test/cases/trace/data/testdata/sw.json b/test/cases/trace/data/testdata/sw.json new file mode 100644 index 00000000..7213854a --- /dev/null +++ b/test/cases/trace/data/testdata/sw.json @@ -0,0 +1,177 @@ +[ + { + "tags": [ + { + "str": { + "value": "1" + } + }, + { + "int": { + "value": 1 + } + }, + { + "str": { + "value": "webapp_service" + } + }, + { + "str": { + "value": "webapp_instance_1" + } + }, + { + "str": { + "value": "/home_endpoint" + } + }, + { + "int": { + "value": 1000 + } + } + ], + "span": "YWJjMTIzIT8kKiYoKSctPUB+" + }, + { + "tags": [ + { + "str": { + "value": "2" + } + }, + { + "int": { + "value": 1 + } + }, + { + "str": { + "value": "webapp_service" + } + }, + { + "str": { + "value": "webapp_instance_2" + } + }, + { + "str": { + "value": "/product_endpoint" + } + }, + { + "int": { + "value": 500 + } + } + ], + "span": "YWJjMTIzIT8kKiYoKSctPUB+" + }, + { + "tags": [ + { + "str": { + "value": "3" + } + }, + { + "int": { + "value": 0 + } + }, + { + "str": { + "value": "webapp_service" + } + }, + { + "str": { + "value": "webapp_instance_1" + } + }, + { + "str": { + "value": "/home_endpoint" + } + }, + { + "int": { + "value": 30 + } + } + ], + "span": "YWJjMTIzIT8kKiYoKSctPUB+" + }, + { + "tags": [ + { + "str": { + "value": "4" + } + }, + { + "int": { + "value": 0 + } + }, + { + "str": { + "value": "webapp_service" + } + }, + { + "str": { + "value": "webapp_instance_3" + } + }, + { + "str": { + "value": "/price_endpoint" + } + }, + { + "int": { + "value": 60 + } + } + ], + "span": "YWJjMTIzIT8kKiYoKSctPUB+" + }, + { + "tags": [ + { + "str": { + "value": "5" + } + }, + { + "int": { + "value": 0 + } + }, + { + "str": { + "value": "webapp_service" + } + }, + { + "str": { + "value": "webapp_instance_1" + } + }, + { + "str": { + "value": "/item_endpoint" + } + }, + { + "int": { + "value": 300 + } + } + ], + "span": "YWJjMTIzIT8kKiYoKSctPUB+" + } +] \ No newline at end of file diff --git a/test/cases/trace/data/want/all.yml b/test/cases/trace/data/want/all.yml new file mode 100644 index 00000000..e29fbaf0 --- /dev/null +++ b/test/cases/trace/data/want/all.yml @@ -0,0 +1,48 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +spans: + - tags: + - key: trace_id + value: + str: + value: "1" + span: "YWJjMTIzIT8kKiYoKSctPUB+" + - tags: + - key: trace_id + value: + str: + value: "2" + span: "YWJjMTIzIT8kKiYoKSctPUB+" + - tags: + - key: trace_id + value: + str: + value: "3" + span: "YWJjMTIzIT8kKiYoKSctPUB+" + - tags: + - key: trace_id + value: + str: + value: "4" + span: "YWJjMTIzIT8kKiYoKSctPUB+" + - tags: + - key: trace_id + value: + str: + value: "5" + span: "YWJjMTIzIT8kKiYoKSctPUB+" diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go new file mode 100644 index 00000000..dd7eb4db --- /dev/null +++ b/test/cases/trace/trace.go @@ -0,0 +1,46 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package trace_test contains integration test cases of the trace. +package trace_test + +import ( + "time" + + g "github.com/onsi/ginkgo/v2" + gm "github.com/onsi/gomega" + + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + trace_test_data "github.com/apache/skywalking-banyandb/test/cases/trace/data" +) + +var ( + // SharedContext is the parallel execution context. + SharedContext helpers.SharedContext + verify = func(innerGm gm.Gomega, args helpers.Args) { + trace_test_data.VerifyFn(innerGm, SharedContext, args) + } +) + +var _ = g.FDescribeTable("Scanning Traces", func(args helpers.Args) { + gm.Eventually(func(innerGm gm.Gomega) { + verify(innerGm, args) + }, flags.EventuallyTimeout).Should(gm.Succeed()) +}, + g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * time.Hour}), +) diff --git a/test/integration/standalone/query/query_suite_test.go b/test/integration/standalone/query/query_suite_test.go index 7c576550..453ad6b3 100644 --- a/test/integration/standalone/query/query_suite_test.go +++ b/test/integration/standalone/query/query_suite_test.go @@ -39,6 +39,7 @@ import ( casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" + casestrace "github.com/apache/skywalking-banyandb/test/cases/trace" integration_standalone "github.com/apache/skywalking-banyandb/test/integration/standalone" ) @@ -82,6 +83,10 @@ var _ = SynchronizedBeforeSuite(func() []byte { Connection: connection, BaseTime: now, } + casestrace.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: now, + } Expect(err).NotTo(HaveOccurred()) })