This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch trace/liaison
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 18ee51552506eb350e3b0599ef82c985e05ac393
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Thu Aug 28 19:34:36 2025 +0800

    Refactor trace service and enhance schema management
    
    - Simplified the OnAddOrUpdate method in the entity repository by removing 
redundant switch cases for schema types, improving code readability and 
maintainability.
    - Updated trace service to utilize the new trace management schema, 
ensuring consistency across trace-related operations.
    - Introduced new test data and validation for trace service, enhancing the 
testing framework and ensuring comprehensive coverage for trace functionalities.
    - Adjusted various components to align with the new trace schema, including 
metadata handling and group configurations.
---
 banyand/liaison/grpc/discovery.go                  |  47 ++---
 banyand/liaison/grpc/trace.go                      |   4 +-
 banyand/trace/introducer.go                        |   8 +-
 banyand/trace/metadata.go                          | 149 ++++++++++++++-
 banyand/trace/svc_standalone.go                    |  89 ++-------
 banyand/trace/trace.go                             |  11 +-
 banyand/trace/trace_suite_test.go                  |   4 +-
 banyand/trace/write_standalone.go                  |   2 +-
 pkg/schema/init.go                                 |  39 +++-
 pkg/test/setup/setup.go                            |   9 +
 pkg/test/trace/etcd.go                             | 109 +++++++++++
 pkg/test/trace/testdata/group.json                 |  21 ---
 .../trace/testdata/groups/test-trace-group.json    |  19 ++
 pkg/test/trace/testdata/traces/sw.json             |   4 -
 test/cases/init.go                                 |   4 +
 test/cases/trace/data/data.go                      | 206 +++++++++++++++++++++
 test/cases/trace/data/input/all.yml                |  20 ++
 test/cases/trace/data/testdata/sw.json             | 177 ++++++++++++++++++
 test/cases/trace/data/want/all.yml                 |  48 +++++
 test/cases/trace/trace.go                          |  46 +++++
 .../standalone/query/query_suite_test.go           |   5 +
 21 files changed, 873 insertions(+), 148 deletions(-)

diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index 751414d3..8df965ad 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -202,26 +202,6 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
        var l partition.Locator
        var id identity
        var modRevision int64
-       switch schemaMetadata.Kind {
-       case schema.KindMeasure:
-               measure := schemaMetadata.Spec.(*databasev1.Measure)
-               modRevision = measure.GetMetadata().GetModRevision()
-               l = partition.NewEntityLocator(measure.TagFamilies, 
measure.Entity, modRevision)
-               id = getID(measure.GetMetadata())
-       case schema.KindStream:
-               stream := schemaMetadata.Spec.(*databasev1.Stream)
-               modRevision = stream.GetMetadata().GetModRevision()
-               l = partition.NewEntityLocator(stream.TagFamilies, 
stream.Entity, modRevision)
-               id = getID(stream.GetMetadata())
-       case schema.KindTrace:
-               trace := schemaMetadata.Spec.(*databasev1.Trace)
-               modRevision = trace.GetMetadata().GetModRevision()
-               // For trace, we don't need entity-based partitioning, just 
store metadata
-               l = partition.Locator{ModRevision: modRevision}
-               id = getID(trace.GetMetadata())
-       default:
-               return
-       }
        if le := e.log.Debug(); le.Enabled() {
                var kind string
                switch schemaMetadata.Kind {
@@ -240,16 +220,20 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
                        Str("kind", kind).
                        Msg("entity added or updated")
        }
-       e.RWMutex.Lock()
-       defer e.RWMutex.Unlock()
-       e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators, 
ModRevision: modRevision}
        switch schemaMetadata.Kind {
        case schema.KindMeasure:
                measure := schemaMetadata.Spec.(*databasev1.Measure)
-               e.measureMap[id] = measure
-               delete(e.traceMap, id) // Ensure trace is not stored for 
measures
+               modRevision = measure.GetMetadata().GetModRevision()
+               l = partition.NewEntityLocator(measure.TagFamilies, 
measure.Entity, modRevision)
+               id = getID(measure.GetMetadata())
+       case schema.KindStream:
+               stream := schemaMetadata.Spec.(*databasev1.Stream)
+               modRevision = stream.GetMetadata().GetModRevision()
+               l = partition.NewEntityLocator(stream.TagFamilies, 
stream.Entity, modRevision)
+               id = getID(stream.GetMetadata())
        case schema.KindTrace:
                trace := schemaMetadata.Spec.(*databasev1.Trace)
+               id = getID(trace.GetMetadata())
                e.traceMap[id] = trace
                // Pre-compute trace ID tag index
                traceIDTagName := trace.GetTraceIdTagName()
@@ -261,10 +245,19 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
                        }
                }
                e.traceIDIndexMap[id] = traceIDIndex
-               delete(e.measureMap, id) // Ensure measure is not stored for 
traces
+               return
        default:
+               return
+       }
+
+       e.RWMutex.Lock()
+       defer e.RWMutex.Unlock()
+       e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators, 
ModRevision: modRevision}
+       if schemaMetadata.Kind == schema.KindMeasure {
+               measure := schemaMetadata.Spec.(*databasev1.Measure)
+               e.measureMap[id] = measure
+       } else {
                delete(e.measureMap, id) // Ensure measure is not stored for 
streams
-               delete(e.traceMap, id)   // Ensure trace is not stored for 
streams
        }
 }
 
diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go
index 0d4f9843..4cb106a0 100644
--- a/banyand/liaison/grpc/trace.go
+++ b/banyand/liaison/grpc/trace.go
@@ -89,11 +89,11 @@ func (s *traceService) validateTimestamp(writeEntity 
*tracev1.WriteRequest) erro
 
 func (s *traceService) validateMetadata(writeEntity *tracev1.WriteRequest) 
error {
        if writeEntity.Metadata.ModRevision > 0 {
-               traceCache, existed := 
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+               traceCache, existed := 
s.entityRepo.getTrace(getID(writeEntity.GetMetadata()))
                if !existed {
                        return errors.New("trace schema not found")
                }
-               if writeEntity.Metadata.ModRevision != traceCache.ModRevision {
+               if writeEntity.Metadata.ModRevision != 
traceCache.GetMetadata().GetModRevision() {
                        return errors.New("expired trace schema")
                }
        }
diff --git a/banyand/trace/introducer.go b/banyand/trace/introducer.go
index a67869de..809820c9 100644
--- a/banyand/trace/introducer.go
+++ b/banyand/trace/introducer.go
@@ -32,7 +32,7 @@ func (i *introduction) reset() {
        i.applied = nil
 }
 
-var introductionPool = pool.Register[*introduction]("stream-introduction")
+var introductionPool = pool.Register[*introduction]("trace-introduction")
 
 func generateIntroduction() *introduction {
        v := introductionPool.Get()
@@ -60,7 +60,7 @@ func (i *flusherIntroduction) reset() {
        i.applied = nil
 }
 
-var flusherIntroductionPool = 
pool.Register[*flusherIntroduction]("stream-flusher-introduction")
+var flusherIntroductionPool = 
pool.Register[*flusherIntroduction]("trace-flusher-introduction")
 
 func generateFlusherIntroduction() *flusherIntroduction {
        v := flusherIntroductionPool.Get()
@@ -94,7 +94,7 @@ func (i *mergerIntroduction) reset() {
        i.creator = 0
 }
 
-var mergerIntroductionPool = 
pool.Register[*mergerIntroduction]("stream-merger-introduction")
+var mergerIntroductionPool = 
pool.Register[*mergerIntroduction]("trace-merger-introduction")
 
 func generateMergerIntroduction() *mergerIntroduction {
        v := mergerIntroductionPool.Get()
@@ -122,7 +122,7 @@ func (i *syncIntroduction) reset() {
        i.applied = nil
 }
 
-var syncIntroductionPool = 
pool.Register[*syncIntroduction]("stream-sync-introduction")
+var syncIntroductionPool = 
pool.Register[*syncIntroduction]("trace-sync-introduction")
 
 func generateSyncIntroduction() *syncIntroduction {
        v := syncIntroductionPool.Get()
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index 0d3c9cf0..b44836bf 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -20,10 +20,12 @@ package trace
 import (
        "context"
        "fmt"
+       "path"
        "time"
 
        "github.com/pkg/errors"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/api/validate"
@@ -34,7 +36,9 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/banyand/queue/pub"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -91,7 +95,10 @@ func newLiaisonSchemaRepo(path string, svc *liaison, 
traceDataNodeRegistry grpc.
 }
 
 func (sr *schemaRepo) start() {
-       sr.l.Info().Str("path", sr.path).Msg("starting trace metadata 
repository")
+       sr.Watcher()
+       sr.metadata.
+               RegisterHandler("trace", 
schema.KindGroup|schema.KindTrace|schema.KindIndexRuleBinding|schema.KindIndexRule,
+                       sr)
 }
 
 func (sr *schemaRepo) Trace(metadata *commonv1.Metadata) (*trace, bool) {
@@ -103,8 +110,16 @@ func (sr *schemaRepo) Trace(metadata *commonv1.Metadata) 
(*trace, bool) {
        return t, ok
 }
 
-func (sr *schemaRepo) GetRemovalSegmentsTimeRange(_ string) 
*timestamp.TimeRange {
-       panic("not implemented")
+func (sr *schemaRepo) GetRemovalSegmentsTimeRange(group string) 
*timestamp.TimeRange {
+       g, ok := sr.LoadGroup(group)
+       if !ok {
+               return nil
+       }
+       db := g.SupplyTSDB()
+       if db == nil {
+               return nil
+       }
+       return db.(storage.TSDB[*tsTable, option]).GetExpiredSegmentsTimeRange()
 }
 
 func (sr *schemaRepo) OnInit(kinds []schema.Kind) (bool, []int64) {
@@ -260,6 +275,15 @@ type supplier struct {
 }
 
 func newSupplier(path string, svc *standalone, nodeLabels map[string]string) 
*supplier {
+       if svc.pm == nil {
+               svc.l.Panic().Msg("CRITICAL: svc.pm is nil in newSupplier")
+       }
+       opt := svc.option
+       opt.protector = svc.pm
+
+       if opt.protector == nil {
+               svc.l.Panic().Msg("CRITICAL: opt.protector is still nil after 
assignment")
+       }
        return &supplier{
                metadata:   svc.metadata,
                omr:        svc.omr,
@@ -267,7 +291,7 @@ func newSupplier(path string, svc *standalone, nodeLabels 
map[string]string) *su
                l:          svc.l,
                nodeLabels: nodeLabels,
                path:       path,
-               option:     svc.option,
+               option:     opt,
        }
 }
 
@@ -282,8 +306,64 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.Resourc
        return s.metadata.TraceRegistry().GetTrace(ctx, md)
 }
 
-func (s *supplier) OpenDB(_ *commonv1.Group) (resourceSchema.DB, error) {
-       panic("not implemented")
+func (s *supplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, 
error) {
+       name := groupSchema.Metadata.Group
+       p := common.Position{
+               Module:   "trace",
+               Database: name,
+       }
+       ro := groupSchema.ResourceOpts
+       if ro == nil {
+               return nil, fmt.Errorf("no resource opts in group %s", name)
+       }
+       shardNum := ro.ShardNum
+       ttl := ro.Ttl
+       segInterval := ro.SegmentInterval
+       segmentIdleTimeout := time.Duration(0)
+       if len(ro.Stages) > 0 && len(s.nodeLabels) > 0 {
+               var ttlNum uint32
+               for _, st := range ro.Stages {
+                       if st.Ttl.Unit != ro.Ttl.Unit {
+                               return nil, fmt.Errorf("ttl unit %s is not 
consistent with stage %s", ro.Ttl.Unit, st.Ttl.Unit)
+                       }
+                       selector, err := pub.ParseLabelSelector(st.NodeSelector)
+                       if err != nil {
+                               return nil, errors.WithMessagef(err, "failed to 
parse node selector %s", st.NodeSelector)
+                       }
+                       ttlNum += st.Ttl.Num
+                       if !selector.Matches(s.nodeLabels) {
+                               continue
+                       }
+                       ttl.Num += ttlNum
+                       shardNum = st.ShardNum
+                       segInterval = st.SegmentInterval
+                       if st.Close {
+                               segmentIdleTimeout = 5 * time.Minute
+                       }
+                       break
+               }
+       }
+       group := groupSchema.Metadata.Group
+       opts := storage.TSDBOpts[*tsTable, option]{
+               ShardNum:                       shardNum,
+               Location:                       path.Join(s.path, group),
+               TSTableCreator:                 newTSTable,
+               TableMetrics:                   s.newMetrics(p),
+               SegmentInterval:                
storage.MustToIntervalRule(segInterval),
+               TTL:                            storage.MustToIntervalRule(ttl),
+               Option:                         s.option,
+               SeriesIndexFlushTimeoutSeconds: 
s.option.flushTimeout.Nanoseconds() / int64(time.Second),
+               SeriesIndexCacheMaxBytes:       
int(s.option.seriesCacheMaxSize),
+               StorageMetricsFactory:          
s.omr.With(traceScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(), 
p.DBLabelValues()))),
+               SegmentIdleTimeout:             segmentIdleTimeout,
+               MemoryLimit:                    s.pm.GetLimit(),
+       }
+       return storage.OpenTSDB(
+               common.SetPosition(context.Background(), func(_ 
common.Position) common.Position {
+                       return p
+               }),
+               opts, nil, group,
+       )
 }
 
 // queueSupplier is the supplier for liaison service.
@@ -299,6 +379,15 @@ type queueSupplier struct {
 }
 
 func newQueueSupplier(path string, svc *liaison, traceDataNodeRegistry 
grpc.NodeRegistry) *queueSupplier {
+       if svc.pm == nil {
+               svc.l.Panic().Msg("CRITICAL: svc.pm is nil in newSupplier")
+       }
+       opt := svc.option
+       opt.protector = svc.pm
+
+       if opt.protector == nil {
+               svc.l.Panic().Msg("CRITICAL: opt.protector is still nil after 
assignment")
+       }
        return &queueSupplier{
                metadata:              svc.metadata,
                omr:                   svc.omr,
@@ -306,7 +395,7 @@ func newQueueSupplier(path string, svc *liaison, 
traceDataNodeRegistry grpc.Node
                traceDataNodeRegistry: traceDataNodeRegistry,
                l:                     svc.l,
                path:                  path,
-               option:                svc.option,
+               option:                opt,
        }
 }
 
@@ -321,6 +410,48 @@ func (qs *queueSupplier) ResourceSchema(md 
*commonv1.Metadata) (resourceSchema.R
        return qs.metadata.TraceRegistry().GetTrace(ctx, md)
 }
 
-func (qs *queueSupplier) OpenDB(_ *commonv1.Group) (resourceSchema.DB, error) {
-       panic("not implemented")
+func (qs *queueSupplier) OpenDB(groupSchema *commonv1.Group) 
(resourceSchema.DB, error) {
+       name := groupSchema.Metadata.Group
+       p := common.Position{
+               Module:   "trace",
+               Database: name,
+       }
+       ro := groupSchema.ResourceOpts
+       if ro == nil {
+               return nil, fmt.Errorf("no resource opts in group %s", name)
+       }
+       shardNum := ro.ShardNum
+       group := groupSchema.Metadata.Group
+       opts := wqueue.Opts[*tsTable, option]{
+               Group:           group,
+               ShardNum:        shardNum,
+               SegmentInterval: storage.MustToIntervalRule(ro.SegmentInterval),
+               Location:        path.Join(qs.path, group),
+               Option:          qs.option,
+               Metrics:         qs.newMetrics(p),
+               SubQueueCreator: newWriteQueue,
+               GetNodes: func(shardID common.ShardID) []string {
+                       copies := ro.Replicas + 1
+                       nodeSet := make(map[string]struct{}, copies)
+                       for i := uint32(0); i < copies; i++ {
+                               nodeID, err := 
qs.traceDataNodeRegistry.Locate(group, "", uint32(shardID), i)
+                               if err != nil {
+                                       qs.l.Error().Err(err).Str("group", 
group).Uint32("shard", uint32(shardID)).Uint32("copy", i).Msg("failed to locate 
node")
+                                       return nil
+                               }
+                               nodeSet[nodeID] = struct{}{}
+                       }
+                       nodes := make([]string, 0, len(nodeSet))
+                       for nodeID := range nodeSet {
+                               nodes = append(nodes, nodeID)
+                       }
+                       return nodes
+               },
+       }
+       return wqueue.Open(
+               common.SetPosition(context.Background(), func(_ 
common.Position) common.Position {
+                       return p
+               }),
+               opts, group,
+       )
 }
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index fe0e9fc9..05e4e858 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -21,12 +21,15 @@ import (
        "context"
        "path"
        "path/filepath"
+       "strings"
 
        "github.com/pkg/errors"
 
+       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/protector"
@@ -63,11 +66,6 @@ type standalone struct {
        maxFileSnapshotNum  int
 }
 
-// StandaloneService returns a new standalone service.
-func StandaloneService(_ context.Context) (Service, error) {
-       return &standalone{}, nil
-}
-
 func (s *standalone) FlagSet() *run.FlagSet {
        fs := run.NewFlagSet("trace")
        fs.StringVar(&s.root, "trace-root-path", "/tmp", "the root path for 
trace data")
@@ -94,37 +92,25 @@ func (s *standalone) Role() databasev1.Role {
        return databasev1.Role_ROLE_DATA
 }
 
-func (s *standalone) PreRun(_ context.Context) error {
-       s.l = logger.GetLogger("trace")
-
-       // Initialize metadata
-       if s.metadata == nil {
-               return errors.New("metadata repo is required")
-       }
-
-       // Initialize filesystem
-       if s.lfs == nil {
-               s.lfs = fs.NewLocalFileSystem()
-       }
-
-       // Initialize protector
-       if s.pm == nil {
-               return errors.New("memory protector is required")
+func (s *standalone) PreRun(ctx context.Context) error {
+       s.l = logger.GetLogger(s.Name())
+       s.l.Info().Msg("memory protector is initialized in PreRun")
+       s.lfs = fs.NewLocalFileSystemWithLoggerAndLimit(s.l, s.pm.GetLimit())
+       path := path.Join(s.root, s.Name())
+       s.snapshotDir = filepath.Join(path, storage.SnapshotsDir)
+       observability.UpdatePath(path)
+       val := ctx.Value(common.ContextNodeKey)
+       if val == nil {
+               return errors.New("node id is empty")
        }
-
-       // Initialize pipeline
-       if s.pipeline == nil {
-               return errors.New("pipeline is required")
-       }
-
-       // Set up data path
+       node := val.(common.Node)
        if s.dataPath == "" {
-               s.dataPath = path.Join(s.root, "trace-data")
+               s.dataPath = filepath.Join(path, storage.DataDir)
        }
-
-       // Initialize schema repository
-       var nodeLabels map[string]string
-       s.schemaRepo = newSchemaRepo(s.dataPath, s, nodeLabels)
+       if !strings.HasPrefix(filepath.VolumeName(s.dataPath), 
filepath.VolumeName(path)) {
+               observability.UpdatePath(s.dataPath)
+       }
+       s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels)
 
        // Initialize snapshot directory
        s.snapshotDir = filepath.Join(s.dataPath, "snapshot")
@@ -148,12 +134,7 @@ func (s *standalone) PreRun(_ context.Context) error {
 }
 
 func (s *standalone) Serve() run.StopNotify {
-       // As specified in the plan, no pipeline listeners should be implemented
-       s.l.Info().Msg("trace standalone service started")
-
-       // Return a channel that never closes since this service runs 
indefinitely
-       stopCh := make(chan struct{})
-       return stopCh
+       return s.schemaRepo.StopCh()
 }
 
 func (s *standalone) GracefulStop() {
@@ -179,36 +160,6 @@ func (s *standalone) GetRemovalSegmentsTimeRange(group 
string) *timestamp.TimeRa
        return s.schemaRepo.GetRemovalSegmentsTimeRange(group)
 }
 
-// SetMetadata sets the metadata repository.
-func (s *standalone) SetMetadata(metadata metadata.Repo) {
-       s.metadata = metadata
-}
-
-// SetObservabilityRegistry sets the observability metrics registry.
-func (s *standalone) SetObservabilityRegistry(omr 
observability.MetricsRegistry) {
-       s.omr = omr
-}
-
-// SetProtector sets the memory protector.
-func (s *standalone) SetProtector(pm protector.Memory) {
-       s.pm = pm
-}
-
-// SetPipeline sets the pipeline server.
-func (s *standalone) SetPipeline(pipeline queue.Server) {
-       s.pipeline = pipeline
-}
-
-// SetLocalPipeline sets the local pipeline queue.
-func (s *standalone) SetLocalPipeline(localPipeline queue.Queue) {
-       s.localPipeline = localPipeline
-}
-
-// SetFileSystem sets the file system.
-func (s *standalone) SetFileSystem(lfs fs.FileSystem) {
-       s.lfs = lfs
-}
-
 // NewService returns a new service.
 func NewService(metadata metadata.Repo, pipeline queue.Server, omr 
observability.MetricsRegistry, pm protector.Memory) (Service, error) {
        return &standalone{
diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go
index b8812ee0..ed33c4f0 100644
--- a/banyand/trace/trace.go
+++ b/banyand/trace/trace.go
@@ -48,12 +48,11 @@ const (
 var traceScope = observability.RootScope.SubScope("trace")
 
 type option struct {
-       mergePolicy              *mergePolicy
-       protector                protector.Memory
-       tire2Client              queue.Client
-       seriesCacheMaxSize       run.Bytes
-       flushTimeout             time.Duration
-       elementIndexFlushTimeout time.Duration
+       mergePolicy        *mergePolicy
+       protector          protector.Memory
+       tire2Client        queue.Client
+       seriesCacheMaxSize run.Bytes
+       flushTimeout       time.Duration
 }
 
 // Service allows inspecting the trace data.
diff --git a/banyand/trace/trace_suite_test.go 
b/banyand/trace/trace_suite_test.go
index b64c502e..05140094 100644
--- a/banyand/trace/trace_suite_test.go
+++ b/banyand/trace/trace_suite_test.go
@@ -33,7 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
-       teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       testtrace "github.com/apache/skywalking-banyandb/pkg/test/trace"
 )
 
 func TestTrace(t *testing.T) {
@@ -57,7 +57,7 @@ func (p *preloadTraceService) Name() string {
 }
 
 func (p *preloadTraceService) PreRun(ctx context.Context) error {
-       return teststream.PreloadSchema(ctx, p.metaSvc.SchemaRegistry())
+       return testtrace.PreloadSchema(ctx, p.metaSvc.SchemaRegistry())
 }
 
 type services struct {
diff --git a/banyand/trace/write_standalone.go 
b/banyand/trace/write_standalone.go
index 5825983e..5974986c 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -186,7 +186,7 @@ func processTraces(schemaRepo *schemaRepo, traces *traces, 
writeEvent *tracev1.I
        }
 
        is := stm.indexSchema.Load().(indexSchema)
-       if len(is.indexRuleLocators) != len(stm.GetSchema().GetTags()) {
+       if len(is.indexRuleLocators) > len(stm.GetSchema().GetTags()) {
                return fmt.Errorf("metadata crashed, tag rule length %d, tag 
length %d",
                        len(is.indexRuleLocators), 
len(stm.GetSchema().GetTags()))
        }
diff --git a/pkg/schema/init.go b/pkg/schema/init.go
index 8cbe0a12..5407b402 100644
--- a/pkg/schema/init.go
+++ b/pkg/schema/init.go
@@ -33,14 +33,15 @@ type revisionContext struct {
        group            int64
        measure          int64
        stream           int64
+       trace            int64
        indexRule        int64
        indexRuleBinding int64
        topNAgg          int64
 }
 
 func (r revisionContext) String() string {
-       return fmt.Sprintf("Group: %d, Measure: %d, Stream: %d, IndexRule: %d, 
IndexRuleBinding: %d, TopNAgg: %d",
-               r.group, r.measure, r.stream, r.indexRule, r.indexRuleBinding, 
r.topNAgg)
+       return fmt.Sprintf("Group: %d, Measure: %d, Stream: %d, Trace: %d, 
IndexRule: %d, IndexRuleBinding: %d, TopNAgg: %d",
+               r.group, r.measure, r.stream, r.trace, r.indexRule, 
r.indexRuleBinding, r.topNAgg)
 }
 
 type revisionContextKey struct{}
@@ -48,7 +49,7 @@ type revisionContextKey struct{}
 var revCtxKey = revisionContextKey{}
 
 func (sr *schemaRepo) Init(kind schema.Kind) ([]string, []int64) {
-       if kind != schema.KindMeasure && kind != schema.KindStream {
+       if kind != schema.KindMeasure && kind != schema.KindStream && kind != 
schema.KindTrace {
                return nil, nil
        }
        catalog := sr.getCatalog(kind)
@@ -74,6 +75,10 @@ func (sr *schemaRepo) Init(kind schema.Kind) ([]string, 
[]int64) {
                sr.l.Info().Stringer("revision", revCtx).Msg("init measures")
                return groupNames, []int64{revCtx.group, revCtx.measure, 
revCtx.indexRuleBinding, revCtx.indexRule, revCtx.topNAgg}
        }
+       if kind == schema.KindTrace {
+               sr.l.Info().Stringer("revision", revCtx).Msg("init trace")
+               return groupNames, []int64{revCtx.group, revCtx.trace, 
revCtx.indexRuleBinding, revCtx.indexRule}
+       }
        sr.l.Info().Stringer("revision", revCtx).Msg("init stream")
        return groupNames, []int64{revCtx.group, revCtx.stream, 
revCtx.indexRuleBinding, revCtx.indexRule}
 }
@@ -82,6 +87,9 @@ func (sr *schemaRepo) getCatalog(kind schema.Kind) 
commonv1.Catalog {
        if kind == schema.KindMeasure {
                return commonv1.Catalog_CATALOG_MEASURE
        }
+       if kind == schema.KindTrace {
+               return commonv1.Catalog_CATALOG_TRACE
+       }
        return commonv1.Catalog_CATALOG_STREAM
 }
 
@@ -96,6 +104,10 @@ func (sr *schemaRepo) processGroup(ctx context.Context, g 
*commonv1.Group, catal
                sr.processMeasure(ctx, g.Metadata.Name)
                return
        }
+       if catalog == commonv1.Catalog_CATALOG_TRACE {
+               sr.processTrace(ctx, g.Metadata.Name)
+               return
+       }
        sr.processStream(ctx, g.Metadata.Name)
 }
 
@@ -178,6 +190,27 @@ func (sr *schemaRepo) processStream(ctx context.Context, 
gName string) {
        sr.l.Info().Str("group", gName).Dur("duration", 
time.Since(start)).Int("size", len(ss)).Msg("store streams")
 }
 
+func (sr *schemaRepo) processTrace(ctx context.Context, gName string) {
+       ctx, cancel := context.WithTimeout(ctx, initTimeout)
+       defer cancel()
+       start := time.Now()
+       tt, err := sr.metadata.TraceRegistry().ListTrace(ctx, 
schema.ListOpt{Group: gName})
+       if err != nil {
+               logger.Panicf("fails to get the traces: %v", err)
+               return
+       }
+       revCtx := ctx.Value(revCtxKey).(*revisionContext)
+       for _, t := range tt {
+               if err := sr.storeResource(t); err != nil {
+                       logger.Panicf("fails to store the trace: %v", err)
+               }
+               if t.Metadata.ModRevision > revCtx.trace {
+                       revCtx.trace = t.Metadata.ModRevision
+               }
+       }
+       sr.l.Info().Str("group", gName).Dur("duration", 
time.Since(start)).Int("size", len(tt)).Msg("store traces")
+}
+
 func (sr *schemaRepo) initGroup(groupSchema *commonv1.Group) (*group, error) {
        g, ok := sr.getGroup(groupSchema.Metadata.Name)
        if ok {
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 55ac3940..b54d74a3 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -42,6 +42,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
        test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
 )
 
 const host = "localhost"
@@ -51,6 +52,7 @@ func Standalone(flags ...string) (string, string, func()) {
        return StandaloneWithSchemaLoaders([]SchemaLoader{
                &preloadService{name: "stream"},
                &preloadService{name: "measure"},
+               &preloadService{name: "trace"},
        }, "", "", "", "", flags...)
 }
 
@@ -59,6 +61,7 @@ func StandaloneWithAuth(username, password string, flags 
...string) (string, str
        return StandaloneWithSchemaLoaders([]SchemaLoader{
                &preloadService{name: "stream"},
                &preloadService{name: "measure"},
+               &preloadService{name: "trace"},
        }, "", "", username, password, flags...)
 }
 
@@ -67,6 +70,7 @@ func StandaloneWithTLS(certFile, keyFile string, flags 
...string) (string, strin
        return StandaloneWithSchemaLoaders([]SchemaLoader{
                &preloadService{name: "stream"},
                &preloadService{name: "measure"},
+               &preloadService{name: "trace"},
        }, certFile, keyFile, "", "", flags...)
 }
 
@@ -99,6 +103,7 @@ func ClosableStandalone(path string, ports []int, flags 
...string) (string, stri
        return standaloneServer(path, ports, []SchemaLoader{
                &preloadService{name: "stream"},
                &preloadService{name: "measure"},
+               &preloadService{name: "trace"},
        }, "", "", flags...)
 }
 
@@ -134,6 +139,7 @@ func standaloneServerWithAuth(path string, ports []int, 
schemaLoaders []SchemaLo
                "--measure-root-path=" + path,
                "--metadata-root-path=" + path,
                "--property-root-path=" + path,
+               "--trace-root-path=" + path,
                fmt.Sprintf("--etcd-listen-client-url=%s", endpoint), 
fmt.Sprintf("--etcd-listen-peer-url=http://%s:%d";, host, ports[3]),
        }
        tlsEnabled := false
@@ -202,6 +208,9 @@ func (p *preloadService) PreRun(ctx context.Context) error {
        if p.name == "stream" {
                return test_stream.PreloadSchema(ctx, p.registry)
        }
+       if p.name == "trace" {
+               return test_trace.PreloadSchema(ctx, p.registry)
+       }
        return test_measure.PreloadSchema(ctx, p.registry)
 }
 
diff --git a/pkg/test/trace/etcd.go b/pkg/test/trace/etcd.go
new file mode 100644
index 00000000..275c1765
--- /dev/null
+++ b/pkg/test/trace/etcd.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package trace implements helpers to load schemas for testing.
+package trace
+
+import (
+       "context"
+       "embed"
+       "path"
+
+       "github.com/pkg/errors"
+       "google.golang.org/protobuf/encoding/protojson"
+       "google.golang.org/protobuf/proto"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+)
+
+const (
+       groupDir            = "testdata/groups"
+       traceDir            = "testdata/traces"
+       indexRuleDir        = "testdata/index_rules"
+       indexRuleBindingDir = "testdata/index_rule_bindings"
+)
+
+//go:embed testdata/*
+var store embed.FS
+
+// PreloadSchema loads schemas from files in the booting process.
+func PreloadSchema(ctx context.Context, e schema.Registry) error {
+       return loadAllSchemas(ctx, e)
+}
+
+// loadAllSchemas loads all trace-related schemas from the testdata directory.
+func loadAllSchemas(ctx context.Context, e schema.Registry) error {
+       return preloadSchemaWithFuncs(ctx, e,
+               func(ctx context.Context, e schema.Registry) error {
+                       return loadSchema(groupDir, &commonv1.Group{}, 
func(group *commonv1.Group) error {
+                               return e.CreateGroup(ctx, group)
+                       })
+               },
+               func(ctx context.Context, e schema.Registry) error {
+                       return loadSchema(traceDir, &databasev1.Trace{}, 
func(trace *databasev1.Trace) error {
+                               _, innerErr := e.CreateTrace(ctx, trace)
+                               return innerErr
+                       })
+               },
+               func(ctx context.Context, e schema.Registry) error {
+                       return loadSchema(indexRuleDir, 
&databasev1.IndexRule{}, func(indexRule *databasev1.IndexRule) error {
+                               return e.CreateIndexRule(ctx, indexRule)
+                       })
+               },
+               func(ctx context.Context, e schema.Registry) error {
+                       return loadSchema(indexRuleBindingDir, 
&databasev1.IndexRuleBinding{}, func(indexRuleBinding 
*databasev1.IndexRuleBinding) error {
+                               return e.CreateIndexRuleBinding(ctx, 
indexRuleBinding)
+                       })
+               },
+       )
+}
+
+// preloadSchemaWithFuncs extracts the common logic for loading schemas.
+func preloadSchemaWithFuncs(ctx context.Context, e schema.Registry, loaders 
...func(context.Context, schema.Registry) error) error {
+       for _, loader := range loaders {
+               if err := loader(ctx, e); err != nil {
+                       return errors.WithStack(err)
+               }
+       }
+       return nil
+}
+
+func loadSchema[T proto.Message](dir string, resource T, loadFn func(resource 
T) error) error {
+       entries, err := store.ReadDir(dir)
+       if err != nil {
+               return err
+       }
+       for _, entry := range entries {
+               data, err := store.ReadFile(path.Join(dir, entry.Name()))
+               if err != nil {
+                       return err
+               }
+               resource.ProtoReflect().Descriptor().RequiredNumbers()
+               if err := protojson.Unmarshal(data, resource); err != nil {
+                       return err
+               }
+               if err := loadFn(resource); err != nil {
+                       if errors.Is(err, schema.ErrGRPCAlreadyExists) {
+                               return nil
+                       }
+                       return err
+               }
+       }
+       return nil
+}
diff --git a/pkg/test/trace/testdata/group.json 
b/pkg/test/trace/testdata/group.json
deleted file mode 100644
index b0f5793d..00000000
--- a/pkg/test/trace/testdata/group.json
+++ /dev/null
@@ -1,21 +0,0 @@
-[
-    {
-        "metadata": {
-            "name": "test-trace-group"
-        },
-        "catalog": "CATALOG_TRACE",
-        "resource_opts": {
-            "shard_num": 2,
-            "replicas": 1,
-            "segment_interval": {
-                "unit": "UNIT_DAY",
-                "num": 1
-            },
-            "ttl": {
-                "unit": "UNIT_DAY",
-                "num": 3
-            }
-        },
-        "updated_at": "2021-04-15T01:30:15.01Z"
-    }
-]
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/groups/test-trace-group.json 
b/pkg/test/trace/testdata/groups/test-trace-group.json
new file mode 100644
index 00000000..3d7bbaf2
--- /dev/null
+++ b/pkg/test/trace/testdata/groups/test-trace-group.json
@@ -0,0 +1,19 @@
+{
+    "metadata": {
+        "name": "test-trace-group"
+    },
+    "catalog": "CATALOG_TRACE",
+    "resource_opts": {
+        "shard_num": 2,
+        "replicas": 1,
+        "segment_interval": {
+            "unit": "UNIT_DAY",
+            "num": 1
+        },
+        "ttl": {
+            "unit": "UNIT_DAY",
+            "num": 3
+        }
+    },
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/traces/sw.json 
b/pkg/test/trace/testdata/traces/sw.json
index 346a7389..37e21ed3 100644
--- a/pkg/test/trace/testdata/traces/sw.json
+++ b/pkg/test/trace/testdata/traces/sw.json
@@ -31,10 +31,6 @@
         {
             "name": "timestamp",
             "type": "TAG_TYPE_TIMESTAMP"
-        },
-        {
-            "name": "data_binary",
-            "type": "TAG_TYPE_DATA_BINARY"
         }
     ],
     "trace_id_tag_name": "trace_id",
diff --git a/test/cases/init.go b/test/cases/init.go
index 5e7d0543..ad2a71b6 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        casesmeasuredata 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
        casesstreamdata 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
+       casestrace "github.com/apache/skywalking-banyandb/test/cases/trace/data"
 )
 
 // Initialize test data.
@@ -60,4 +61,7 @@ func Initialize(addr string, now time.Time) {
        casesmeasuredata.Write(conn, "duplicated", "exception", 
"duplicated.json", now, 0)
        casesmeasuredata.Write(conn, "service_cpm_minute", "sw_updated", 
"service_cpm_minute_updated_data.json", now.Add(10*time.Minute), interval)
        time.Sleep(5 * time.Second)
+       // trace
+       interval = 500 * time.Millisecond
+       casestrace.Write(conn, "sw", now, interval)
 }
diff --git a/test/cases/trace/data/data.go b/test/cases/trace/data/data.go
new file mode 100644
index 00000000..fc70b23d
--- /dev/null
+++ b/test/cases/trace/data/data.go
@@ -0,0 +1,206 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package data is used to test the trace service.
+package data
+
+import (
+       "context"
+       "embed"
+       "encoding/base64"
+       "encoding/json"
+       "fmt"
+       "io"
+       "slices"
+       "strings"
+       "time"
+
+       "github.com/google/go-cmp/cmp"
+       g "github.com/onsi/ginkgo/v2"
+       gm "github.com/onsi/gomega"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/protobuf/encoding/protojson"
+       "google.golang.org/protobuf/testing/protocmp"
+       "google.golang.org/protobuf/types/known/timestamppb"
+       "sigs.k8s.io/yaml"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+)
+
+//go:embed input/*.yml
+var inputFS embed.FS
+
+//go:embed want/*.yml
+var wantFS embed.FS
+
+//go:embed testdata/*.json
+var dataFS embed.FS
+
+// VerifyFn verify whether the query response matches the wanted result.
+var VerifyFn = func(innerGm gm.Gomega, sharedContext helpers.SharedContext, 
args helpers.Args) {
+       i, err := inputFS.ReadFile("input/" + args.Input + ".yml")
+       innerGm.Expect(err).NotTo(gm.HaveOccurred())
+       query := &tracev1.QueryRequest{}
+       helpers.UnmarshalYAML(i, query)
+       query.TimeRange = helpers.TimeRange(args, sharedContext)
+       query.Stages = args.Stages
+       c := tracev1.NewTraceServiceClient(sharedContext.Connection)
+       ctx := context.Background()
+       resp, err := c.Query(ctx, query)
+       if args.WantErr {
+               if err == nil {
+                       g.Fail("expect error")
+               }
+               return
+       }
+       innerGm.Expect(err).NotTo(gm.HaveOccurred(), query.String())
+       if args.WantEmpty {
+               innerGm.Expect(resp.Spans).To(gm.BeEmpty())
+               return
+       }
+       if args.Want == "" {
+               args.Want = args.Input
+       }
+       ww, err := wantFS.ReadFile("want/" + args.Want + ".yml")
+       innerGm.Expect(err).NotTo(gm.HaveOccurred())
+       want := &tracev1.QueryResponse{}
+       helpers.UnmarshalYAML(ww, want)
+       if args.DisOrder {
+               slices.SortFunc(want.Spans, func(a, b *tracev1.Span) int {
+                       // Sort by first tag value for consistency
+                       if len(a.Tags) > 0 && len(b.Tags) > 0 {
+                               return 
strings.Compare(a.Tags[0].Value.GetStr().GetValue(), 
b.Tags[0].Value.GetStr().GetValue())
+                       }
+                       return 0
+               })
+               slices.SortFunc(resp.Spans, func(a, b *tracev1.Span) int {
+                       if len(a.Tags) > 0 && len(b.Tags) > 0 {
+                               return 
strings.Compare(a.Tags[0].Value.GetStr().GetValue(), 
b.Tags[0].Value.GetStr().GetValue())
+                       }
+                       return 0
+               })
+       }
+       var extra []cmp.Option
+       extra = append(extra, protocmp.IgnoreUnknown(),
+               protocmp.Transform())
+       success := innerGm.Expect(cmp.Equal(resp, want,
+               extra...)).
+               To(gm.BeTrue(), func() string {
+                       var j []byte
+                       j, err = protojson.Marshal(resp)
+                       if err != nil {
+                               return err.Error()
+                       }
+                       var y []byte
+                       y, err = yaml.JSONToYAML(j)
+                       if err != nil {
+                               return err.Error()
+                       }
+                       return string(y)
+               })
+       if !success {
+               return
+       }
+       query.Trace = true
+       resp, err = c.Query(ctx, query)
+       innerGm.Expect(err).NotTo(gm.HaveOccurred())
+       innerGm.Expect(resp.TraceQueryResult).NotTo(gm.BeNil())
+       innerGm.Expect(resp.TraceQueryResult.GetSpans()).NotTo(gm.BeEmpty())
+}
+
+func loadData(stream tracev1.TraceService_WriteClient, metadata 
*commonv1.Metadata, dataFile string, baseTime time.Time, interval 
time.Duration) {
+       var templates []interface{}
+       content, err := dataFS.ReadFile("testdata/" + dataFile)
+       gm.Expect(err).ShouldNot(gm.HaveOccurred())
+       gm.Expect(json.Unmarshal(content, 
&templates)).ShouldNot(gm.HaveOccurred())
+
+       for i, template := range templates {
+               // Extract span data from template
+               templateMap, ok := template.(map[string]interface{})
+               gm.Expect(ok).To(gm.BeTrue())
+
+               // Get span data
+               spanData, ok := templateMap["span"].(string)
+               gm.Expect(ok).To(gm.BeTrue())
+               spanBytes, err := base64.StdEncoding.DecodeString(spanData)
+               gm.Expect(err).ShouldNot(gm.HaveOccurred())
+
+               // Get tags data
+               tagsData, ok := templateMap["tags"].([]interface{})
+               gm.Expect(ok).To(gm.BeTrue())
+
+               // Convert tags to TagValue format
+               var tagValues []*modelv1.TagValue
+               for _, tag := range tagsData {
+                       tagBytes, err := json.Marshal(tag)
+                       gm.Expect(err).ShouldNot(gm.HaveOccurred())
+                       tagValue := &modelv1.TagValue{}
+                       gm.Expect(protojson.Unmarshal(tagBytes, 
tagValue)).ShouldNot(gm.HaveOccurred())
+                       tagValues = append(tagValues, tagValue)
+               }
+
+               // Add timestamp tag as the last tag
+               timestamp := baseTime.Add(interval * time.Duration(i))
+               timestampTag := &modelv1.TagValue{
+                       Value: &modelv1.TagValue_Timestamp{
+                               Timestamp: timestamppb.New(timestamp),
+                       },
+               }
+               tagValues = append(tagValues, timestampTag)
+
+               errInner := stream.Send(&tracev1.WriteRequest{
+                       Metadata: metadata,
+                       Tags:     tagValues,
+                       Span:     spanBytes,
+                       Version:  uint64(i + 1),
+               })
+               gm.Expect(errInner).ShouldNot(gm.HaveOccurred())
+       }
+}
+
+// Write writes trace data to the database.
+func Write(conn *grpclib.ClientConn, name string, baseTime time.Time, interval 
time.Duration) {
+       WriteToGroup(conn, name, "test-trace-group", name, baseTime, interval)
+}
+
+// WriteToGroup writes trace data to a specific group.
+func WriteToGroup(conn *grpclib.ClientConn, name, group, fileName string, 
baseTime time.Time, interval time.Duration) {
+       metadata := &commonv1.Metadata{
+               Name:  name,
+               Group: group,
+       }
+       schema := databasev1.NewTraceRegistryServiceClient(conn)
+       resp, err := schema.Get(context.Background(), 
&databasev1.TraceRegistryServiceGetRequest{Metadata: metadata})
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+       metadata = resp.GetTrace().GetMetadata()
+
+       c := tracev1.NewTraceServiceClient(conn)
+       ctx := context.Background()
+       writeClient, err := c.Write(ctx)
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+       loadData(writeClient, metadata, fmt.Sprintf("%s.json", fileName), 
baseTime, interval)
+       gm.Expect(writeClient.CloseSend()).To(gm.Succeed())
+       gm.Eventually(func() error {
+               _, err := writeClient.Recv()
+               return err
+       }, flags.EventuallyTimeout).Should(gm.Equal(io.EOF))
+}
diff --git a/test/cases/trace/data/input/all.yml 
b/test/cases/trace/data/input/all.yml
new file mode 100644
index 00000000..8a295c06
--- /dev/null
+++ b/test/cases/trace/data/input/all.yml
@@ -0,0 +1,20 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "sw"
+groups: ["test-trace-group"]
+tag_projection: ["trace_id"]
diff --git a/test/cases/trace/data/testdata/sw.json 
b/test/cases/trace/data/testdata/sw.json
new file mode 100644
index 00000000..7213854a
--- /dev/null
+++ b/test/cases/trace/data/testdata/sw.json
@@ -0,0 +1,177 @@
+[
+    {
+        "tags": [
+            {
+                "str": {
+                    "value": "1"
+                }
+            },
+            {
+                "int": {
+                    "value": 1
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_service"
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_instance_1"
+                }
+            },
+            {
+                "str": {
+                    "value": "/home_endpoint"
+                }
+            },
+            {
+                "int": {
+                    "value": 1000
+                }
+            }
+        ],
+        "span": "YWJjMTIzIT8kKiYoKSctPUB+"
+    },
+    {
+        "tags": [
+            {
+                "str": {
+                    "value": "2"
+                }
+            },
+            {
+                "int": {
+                    "value": 1
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_service"
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_instance_2"
+                }
+            },
+            {
+                "str": {
+                    "value": "/product_endpoint"
+                }
+            },
+            {
+                "int": {
+                    "value": 500
+                }
+            }
+        ],
+        "span": "YWJjMTIzIT8kKiYoKSctPUB+"
+    },
+    {
+        "tags": [
+            {
+                "str": {
+                    "value": "3"
+                }
+            },
+            {
+                "int": {
+                    "value": 0
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_service"
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_instance_1"
+                }
+            },
+            {
+                "str": {
+                    "value": "/home_endpoint"
+                }
+            },
+            {
+                "int": {
+                    "value": 30
+                }
+            }
+        ],
+        "span": "YWJjMTIzIT8kKiYoKSctPUB+"
+    },
+    {
+        "tags": [
+            {
+                "str": {
+                    "value": "4"
+                }
+            },
+            {
+                "int": {
+                    "value": 0
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_service"
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_instance_3"
+                }
+            },
+            {
+                "str": {
+                    "value": "/price_endpoint"
+                }
+            },
+            {
+                "int": {
+                    "value": 60
+                }
+            }
+        ],
+        "span": "YWJjMTIzIT8kKiYoKSctPUB+"
+    },
+    {
+        "tags": [
+            {
+                "str": {
+                    "value": "5"
+                }
+            },
+            {
+                "int": {
+                    "value": 0
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_service"
+                }
+            },
+            {
+                "str": {
+                    "value": "webapp_instance_1"
+                }
+            },
+            {
+                "str": {
+                    "value": "/item_endpoint"
+                }
+            },
+            {
+                "int": {
+                    "value": 300
+                }
+            }
+        ],
+        "span": "YWJjMTIzIT8kKiYoKSctPUB+"
+    }
+]
\ No newline at end of file
diff --git a/test/cases/trace/data/want/all.yml 
b/test/cases/trace/data/want/all.yml
new file mode 100644
index 00000000..e29fbaf0
--- /dev/null
+++ b/test/cases/trace/data/want/all.yml
@@ -0,0 +1,48 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+spans:
+  - tags:
+    - key: trace_id
+      value:
+        str:
+          value: "1"
+    span: "YWJjMTIzIT8kKiYoKSctPUB+"
+  - tags:
+    - key: trace_id
+      value:
+        str:
+          value: "2"
+    span: "YWJjMTIzIT8kKiYoKSctPUB+"
+  - tags:
+    - key: trace_id
+      value:
+        str:
+          value: "3"
+    span: "YWJjMTIzIT8kKiYoKSctPUB+"
+  - tags:
+    - key: trace_id
+      value:
+        str:
+          value: "4"
+    span: "YWJjMTIzIT8kKiYoKSctPUB+"
+  - tags:
+    - key: trace_id
+      value:
+        str:
+          value: "5"
+    span: "YWJjMTIzIT8kKiYoKSctPUB+"
diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go
new file mode 100644
index 00000000..dd7eb4db
--- /dev/null
+++ b/test/cases/trace/trace.go
@@ -0,0 +1,46 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package trace_test contains integration test cases of the trace.
+package trace_test
+
+import (
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       gm "github.com/onsi/gomega"
+
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       trace_test_data 
"github.com/apache/skywalking-banyandb/test/cases/trace/data"
+)
+
+var (
+       // SharedContext is the parallel execution context.
+       SharedContext helpers.SharedContext
+       verify        = func(innerGm gm.Gomega, args helpers.Args) {
+               trace_test_data.VerifyFn(innerGm, SharedContext, args)
+       }
+)
+
+var _ = g.FDescribeTable("Scanning Traces", func(args helpers.Args) {
+       gm.Eventually(func(innerGm gm.Gomega) {
+               verify(innerGm, args)
+       }, flags.EventuallyTimeout).Should(gm.Succeed())
+},
+       g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * 
time.Hour}),
+)
diff --git a/test/integration/standalone/query/query_suite_test.go 
b/test/integration/standalone/query/query_suite_test.go
index 7c576550..453ad6b3 100644
--- a/test/integration/standalone/query/query_suite_test.go
+++ b/test/integration/standalone/query/query_suite_test.go
@@ -39,6 +39,7 @@ import (
        casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
        casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
        casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+       casestrace "github.com/apache/skywalking-banyandb/test/cases/trace"
        integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
 )
 
@@ -82,6 +83,10 @@ var _ = SynchronizedBeforeSuite(func() []byte {
                Connection: connection,
                BaseTime:   now,
        }
+       casestrace.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   now,
+       }
        Expect(err).NotTo(HaveOccurred())
 })
 

Reply via email to