This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch trace/sidx in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 9ed28d0eeed53cb9305e69a2a3c7f921b6167876 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sat Aug 30 22:46:56 2025 +0800 Add series document tracking to traces module --- banyand/trace/traces.go | 7 +++++++ banyand/trace/write_standalone.go | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/banyand/trace/traces.go b/banyand/trace/traces.go index b812df08..bf92d649 100644 --- a/banyand/trace/traces.go +++ b/banyand/trace/traces.go @@ -26,6 +26,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/internal/sidx" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/internal/wqueue" + "github.com/apache/skywalking-banyandb/pkg/index" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -195,12 +196,18 @@ func releaseTraces(t *traces) { var tracesPool = pool.Register[*traces]("trace-traces") +type seriesDoc struct { + docIDsAdded map[uint64]struct{} + docs index.Documents +} + type tracesInTable struct { segment storage.Segment[*tsTable, option] tsTable *tsTable traces *traces sidxReqsMap map[string][]sidx.WriteRequest timeRange timestamp.TimeRange + seriesDocs seriesDoc shardID common.ShardID } diff --git a/banyand/trace/write_standalone.go b/banyand/trace/write_standalone.go index a4f3c275..a498bc9c 100644 --- a/banyand/trace/write_standalone.go +++ b/banyand/trace/write_standalone.go @@ -34,6 +34,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -158,6 +159,10 @@ func (w *writeCallback) prepareTracesInTable(eg *tracesInGroup, writeEvent *trac traces: generateTraces(), segment: segment, sidxReqsMap: make(map[string][]sidx.WriteRequest), + seriesDocs: seriesDoc{ + docs: make(index.Documents, 0), + docIDsAdded: make(map[uint64]struct{}), + }, } et.traces.reset() eg.tables = append(eg.tables, et) @@ -271,6 +276,15 @@ func processTraces(schemaRepo *schemaRepo, tracesInTable *tracesInTable, writeEv tracesInTable.sidxReqsMap[sidxName] = make([]sidx.WriteRequest, 0) } tracesInTable.sidxReqsMap[sidxName] = append(tracesInTable.sidxReqsMap[sidxName], writeReq) + + docID := uint64(series.ID) + if _, existed := tracesInTable.seriesDocs.docIDsAdded[docID]; !existed { + tracesInTable.seriesDocs.docs = append(tracesInTable.seriesDocs.docs, index.Document{ + DocID: docID, + EntityValues: series.Buffer, + }) + tracesInTable.seriesDocs.docIDsAdded[docID] = struct{}{} + } } return nil @@ -326,6 +340,11 @@ func (w *writeCallback) Rev(ctx context.Context, message bus.Message) (resp bus. } } } + if len(es.seriesDocs.docs) > 0 { + if err := es.segment.IndexDB().Update(es.seriesDocs.docs); err != nil { + w.l.Error().Err(err).Msg("cannot write series index") + } + } releaseTraces(es.traces) } if len(g.segments) > 0 {