This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch trace/sidx in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 8923ce8f5e5fe8cb44c5cb44036fc76b74eee329 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sat Aug 30 18:29:42 2025 +0800 Implement sidx query for trace IDs in the trace module. Added logic to check for sidx usage when no trace IDs are provided, and introduced a new method to query multiple sidx instances for ordered trace IDs. This enhances the querying capabilities and improves performance by leveraging secondary indexes. --- banyand/trace/query.go | 32 ++++++++++++++++++++++++++++++++ banyand/trace/trace.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/banyand/trace/query.go b/banyand/trace/query.go index bfc334de..822d518c 100644 --- a/banyand/trace/query.go +++ b/banyand/trace/query.go @@ -27,6 +27,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/sidx" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -109,6 +110,37 @@ func (t *trace) Query(ctx context.Context, tqo model.TraceQueryOptions) (model.T tt, _ := segment.Tables() tables = append(tables, tt...) } + + // Check if we need to use sidx for ordering when no trace IDs are provided + if len(tqo.TraceIDs) == 0 && tqo.Order != nil { + // Extract sidx name from index rule + sidxName := "default" // fallback to default sidx + if tqo.Order.Index != nil { + sidxName = tqo.Order.Index.GetMetadata().GetName() + } + + // Collect sidx instances from all tables + var sidxInstances []sidx.SIDX + for _, table := range tables { + if sidxInstance, exists := table.getSidx(sidxName); exists { + sidxInstances = append(sidxInstances, sidxInstance) + } + } + + if len(sidxInstances) > 0 { + // Query sidx for trace IDs + traceIDs, sidxErr := t.querySidxForTraceIDs(ctx, sidxInstances, tqo) + if sidxErr != nil { + t.l.Warn().Err(sidxErr).Str("sidx", sidxName).Msg("sidx query failed, falling back to normal query") + } else if len(traceIDs) > 0 { + qo.traceIDs = traceIDs + sort.Strings(qo.traceIDs) + } + } + } + if len(qo.traceIDs) == 0 { + return nilResult, nil + } for i := range tables { s := tables[i].currentSnapshot() if s == nil { diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go index 9538ce10..7e7cc594 100644 --- a/banyand/trace/trace.go +++ b/banyand/trace/trace.go @@ -23,11 +23,14 @@ package trace import ( "context" + "fmt" "sync/atomic" "time" + "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/sidx" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/queue" @@ -133,6 +136,53 @@ func (t *trace) parseSpec() { t.indexSchema.Store(is) } +// querySidxForTraceIDs queries sidx instances to get ordered trace IDs. +func (t *trace) querySidxForTraceIDs(ctx context.Context, sidxInstances []sidx.SIDX, tqo model.TraceQueryOptions) ([]string, error) { + // Convert TraceQueryOptions to sidx.QueryRequest + req := sidx.QueryRequest{ + Filter: tqo.SkippingFilter, + Order: tqo.Order, + MaxElementSize: tqo.MaxTraceSize, + } + + // Convert TagProjection to slice format if needed + if tqo.TagProjection != nil { + req.TagProjection = []model.TagProjection{*tqo.TagProjection} + } + + // Set key range based on time range + if tqo.TimeRange != nil { + minKey := tqo.TimeRange.Start.UnixNano() + maxKey := tqo.TimeRange.End.UnixNano() + req.MinKey = &minKey + req.MaxKey = &maxKey + } + + // For now, use all series IDs (this could be optimized further) + // TODO: Consider filtering by relevant series IDs based on query context + req.SeriesIDs = []common.SeriesID{1} // Placeholder - should be dynamically determined + + // Query multiple sidx instances + response, err := sidx.QueryMultipleSIDX(ctx, sidxInstances, req) + if err != nil { + return nil, fmt.Errorf("sidx query failed: %w", err) + } + + if response == nil || len(response.Data) == 0 { + return nil, nil + } + + // Extract trace IDs from response data + traceIDs := make([]string, 0, len(response.Data)) + for _, data := range response.Data { + if len(data) > 0 { + traceIDs = append(traceIDs, string(data)) + } + } + + return traceIDs, nil +} + func openTrace(schema *databasev1.Trace, l *logger.Logger, pm protector.Memory, schemaRepo *schemaRepo) *trace { t := &trace{ schema: schema,