This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch trace/sidx
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 9ed28d0eeed53cb9305e69a2a3c7f921b6167876
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sat Aug 30 22:46:56 2025 +0800

    Add series document tracking to traces module
---
 banyand/trace/traces.go           |  7 +++++++
 banyand/trace/write_standalone.go | 19 +++++++++++++++++++
 2 files changed, 26 insertions(+)

diff --git a/banyand/trace/traces.go b/banyand/trace/traces.go
index b812df08..bf92d649 100644
--- a/banyand/trace/traces.go
+++ b/banyand/trace/traces.go
@@ -26,6 +26,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/internal/wqueue"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -195,12 +196,18 @@ func releaseTraces(t *traces) {
 
 var tracesPool = pool.Register[*traces]("trace-traces")
 
+type seriesDoc struct {
+       docIDsAdded map[uint64]struct{}
+       docs        index.Documents
+}
+
 type tracesInTable struct {
        segment     storage.Segment[*tsTable, option]
        tsTable     *tsTable
        traces      *traces
        sidxReqsMap map[string][]sidx.WriteRequest
        timeRange   timestamp.TimeRange
+       seriesDocs  seriesDoc
        shardID     common.ShardID
 }
 
diff --git a/banyand/trace/write_standalone.go 
b/banyand/trace/write_standalone.go
index a4f3c275..a498bc9c 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -34,6 +34,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -158,6 +159,10 @@ func (w *writeCallback) prepareTracesInTable(eg 
*tracesInGroup, writeEvent *trac
                        traces:      generateTraces(),
                        segment:     segment,
                        sidxReqsMap: make(map[string][]sidx.WriteRequest),
+                       seriesDocs: seriesDoc{
+                               docs:        make(index.Documents, 0),
+                               docIDsAdded: make(map[uint64]struct{}),
+                       },
                }
                et.traces.reset()
                eg.tables = append(eg.tables, et)
@@ -271,6 +276,15 @@ func processTraces(schemaRepo *schemaRepo, tracesInTable 
*tracesInTable, writeEv
                        tracesInTable.sidxReqsMap[sidxName] = 
make([]sidx.WriteRequest, 0)
                }
                tracesInTable.sidxReqsMap[sidxName] = 
append(tracesInTable.sidxReqsMap[sidxName], writeReq)
+
+               docID := uint64(series.ID)
+               if _, existed := tracesInTable.seriesDocs.docIDsAdded[docID]; 
!existed {
+                       tracesInTable.seriesDocs.docs = 
append(tracesInTable.seriesDocs.docs, index.Document{
+                               DocID:        docID,
+                               EntityValues: series.Buffer,
+                       })
+                       tracesInTable.seriesDocs.docIDsAdded[docID] = struct{}{}
+               }
        }
 
        return nil
@@ -326,6 +340,11 @@ func (w *writeCallback) Rev(ctx context.Context, message 
bus.Message) (resp bus.
                                        }
                                }
                        }
+                       if len(es.seriesDocs.docs) > 0 {
+                               if err := 
es.segment.IndexDB().Update(es.seriesDocs.docs); err != nil {
+                                       w.l.Error().Err(err).Msg("cannot write 
series index")
+                               }
+                       }
                        releaseTraces(es.traces)
                }
                if len(g.segments) > 0 {

Reply via email to