This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new b57a24fc Trace、Stream Series Metadata in Liaison Sending Queue (#896)
b57a24fc is described below
commit b57a24fcd0885d1f8f2c7c893574818b2670aa43
Author: OmCheeLin <[email protected]>
AuthorDate: Tue Dec 16 17:29:18 2025 +0800
Trace、Stream Series Metadata in Liaison Sending Queue (#896)
---
CHANGES.md | 1 +
banyand/stream/part.go | 43 ++++++++++++--
banyand/stream/part_test.go | 121 ++++++++++++++++++++++++++++++++++++++++
banyand/stream/tstable.go | 11 +++-
banyand/stream/write_liaison.go | 57 ++++++++++---------
banyand/trace/part.go | 49 +++++++++++++---
banyand/trace/part_test.go | 121 ++++++++++++++++++++++++++++++++++++++++
banyand/trace/tstable.go | 11 +++-
banyand/trace/write_liaison.go | 45 ++++++++-------
9 files changed, 396 insertions(+), 63 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 8223dac7..897e0d75 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,6 +13,7 @@ Release Notes.
- Update bydbQL to add sorted query support for the Property.
- Remove the windows arch for binary and docker image.
- Support writing data with specifications.
+- Persist series metadata in liaison queue for measure, stream and trace
models.
### Bug Fixes
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index 39cb2145..b727f5d7 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -18,6 +18,7 @@
package stream
import (
+ "errors"
"fmt"
"path"
"path/filepath"
@@ -35,17 +36,19 @@ import (
const (
// Streaming file names (without extensions).
- streamPrimaryName = "primary"
- streamMetaName = "meta"
- streamTimestampsName = "timestamps"
- streamTagFamiliesPrefix = "tf:"
- streamTagMetadataPrefix = "tfm:"
- streamTagFilterPrefix = "tff:"
+ streamPrimaryName = "primary"
+ streamMetaName = "meta"
+ streamTimestampsName = "timestamps"
+ streamTagFamiliesPrefix = "tf:"
+ streamTagMetadataPrefix = "tfm:"
+ streamTagFilterPrefix = "tff:"
+ streamSeriesMetadataName = "smeta"
metadataFilename = "metadata.json"
primaryFilename = streamPrimaryName + ".bin"
metaFilename = streamMetaName + ".bin"
timestampsFilename = streamTimestampsName + ".bin"
+ seriesMetadataFilename = streamSeriesMetadataName + ".bin"
elementIndexFilename = "idx"
tagFamiliesMetadataFilenameExt = ".tfm"
tagFamiliesFilenameExt = ".tf"
@@ -59,6 +62,7 @@ type part struct {
tagFamilyMetadata map[string]fs.Reader
tagFamilies map[string]fs.Reader
tagFamilyFilter map[string]fs.Reader
+ seriesMetadata fs.Reader // Optional: series metadata reader
path string
primaryBlockMetadata []primaryBlockMetadata
partMetadata partMetadata
@@ -67,6 +71,9 @@ type part struct {
func (p *part) close() {
fs.MustClose(p.primary)
fs.MustClose(p.timestamps)
+ if p.seriesMetadata != nil {
+ fs.MustClose(p.seriesMetadata)
+ }
for _, tf := range p.tagFamilies {
fs.MustClose(tf)
}
@@ -91,6 +98,9 @@ func openMemPart(mp *memPart) *part {
// Open data files
p.primary = &mp.primary
p.timestamps = &mp.timestamps
+ if len(mp.seriesMetadata.Buf) > 0 {
+ p.seriesMetadata = &mp.seriesMetadata
+ }
if mp.tagFamilies != nil {
p.tagFamilies = make(map[string]fs.Reader)
p.tagFamilyMetadata = make(map[string]fs.Reader)
@@ -111,6 +121,7 @@ type memPart struct {
meta bytes.Buffer
primary bytes.Buffer
timestamps bytes.Buffer
+ seriesMetadata bytes.Buffer
partMetadata partMetadata
segmentID int64
}
@@ -141,6 +152,7 @@ func (mp *memPart) reset() {
mp.meta.Reset()
mp.primary.Reset()
mp.timestamps.Reset()
+ mp.seriesMetadata.Reset()
mp.segmentID = 0
if mp.tagFamilies != nil {
for k, tf := range mp.tagFamilies {
@@ -211,6 +223,11 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem,
path string) {
fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path,
name+tagFamiliesFilterFilenameExt), storage.FilePerm)
}
+ // Flush series metadata if available
+ if len(mp.seriesMetadata.Buf) > 0 {
+ fs.MustFlush(fileSystem, mp.seriesMetadata.Buf,
filepath.Join(path, seriesMetadataFilename), storage.FilePerm)
+ }
+
mp.partMetadata.mustWriteMetadata(fileSystem, path)
fileSystem.SyncPath(path)
@@ -304,6 +321,20 @@ func mustOpenFilePart(id uint64, root string, fileSystem
fs.FileSystem) *part {
p.primary = mustOpenReader(path.Join(partPath, primaryFilename),
fileSystem)
p.timestamps = mustOpenReader(path.Join(partPath, timestampsFilename),
fileSystem)
+
+ // Try to open series metadata file (optional, for backward
compatibility)
+ seriesMetadataPath := path.Join(partPath, seriesMetadataFilename)
+ reader, err := fileSystem.OpenFile(seriesMetadataPath)
+ if err != nil {
+ var fsErr *fs.FileSystemError
+ // File does not exist is acceptable for backward compatibility
+ if !errors.As(err, &fsErr) || fsErr.Code != fs.IsNotExistError {
+ logger.Panicf("cannot open series metadata file %q:
%s", seriesMetadataPath, err)
+ }
+ } else {
+ p.seriesMetadata = reader
+ }
+
ee := fileSystem.ReadDir(partPath)
for _, e := range ee {
if e.IsDir() {
diff --git a/banyand/stream/part_test.go b/banyand/stream/part_test.go
index f4443998..638bde1e 100644
--- a/banyand/stream/part_test.go
+++ b/banyand/stream/part_test.go
@@ -18,6 +18,7 @@
package stream
import (
+ "path/filepath"
"testing"
"github.com/stretchr/testify/assert"
@@ -26,6 +27,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/test"
)
@@ -180,3 +182,122 @@ var es = &elements{
{}, // empty tagFamilies for seriesID 3
},
}
+
+func TestSeriesMetadataPersistence(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ epoch := uint64(12345)
+ path := partPath(tmpPath, epoch)
+
+ // Create a memPart with elements
+ mp := generateMemPart()
+ mp.mustInitFromElements(es)
+
+ // Create sample series metadata using NewBytesField
+ field1 := index.NewBytesField(index.FieldKey{
+ IndexRuleID: 1,
+ Analyzer: "keyword",
+ TagName: "tag1",
+ }, []byte("term1"))
+ field2 := index.NewBytesField(index.FieldKey{
+ IndexRuleID: 2,
+ Analyzer: "keyword",
+ TagName: "tag2",
+ }, []byte("term2"))
+ metadataDocs := index.Documents{
+ {
+ DocID: 1,
+ EntityValues: []byte("entity1"),
+ Fields: []index.Field{field1},
+ },
+ {
+ DocID: 2,
+ EntityValues: []byte("entity2"),
+ Fields: []index.Field{field2},
+ },
+ }
+
+ // Marshal series metadata
+ seriesMetadataBytes, err := metadataDocs.Marshal()
+ require.NoError(t, err)
+ require.NotEmpty(t, seriesMetadataBytes)
+
+ // Set series metadata in memPart
+ _, err = mp.seriesMetadata.Write(seriesMetadataBytes)
+ require.NoError(t, err)
+
+ // Flush to disk
+ mp.mustFlush(fileSystem, path)
+
+ // Verify series metadata file exists by trying to read it
+ seriesMetadataPath := filepath.Join(path, seriesMetadataFilename)
+ readBytes, err := fileSystem.Read(seriesMetadataPath)
+ require.NoError(t, err, "series metadata file should exist")
+ assert.Equal(t, seriesMetadataBytes, readBytes, "series metadata
content should match")
+
+ // Open the part and verify series metadata is accessible
+ p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+ defer p.close()
+
+ // Verify series metadata reader is available
+ assert.NotNil(t, p.seriesMetadata, "series metadata reader should be
available")
+
+ // Read and unmarshal series metadata using SequentialRead
+ seqReader := p.seriesMetadata.SequentialRead()
+ defer seqReader.Close()
+ readMetadataBytes := make([]byte, 0)
+ buf := make([]byte, 1024)
+ for {
+ var n int
+ n, err = seqReader.Read(buf)
+ if n == 0 {
+ if err != nil {
+ break
+ }
+ continue
+ }
+ if err != nil {
+ break
+ }
+ readMetadataBytes = append(readMetadataBytes, buf[:n]...)
+ }
+
+ var readDocs index.Documents
+ err = readDocs.Unmarshal(readMetadataBytes)
+ require.NoError(t, err)
+ assert.Equal(t, len(metadataDocs), len(readDocs), "number of documents
should match")
+ assert.Equal(t, metadataDocs[0].DocID, readDocs[0].DocID, "first
document DocID should match")
+ assert.Equal(t, metadataDocs[1].DocID, readDocs[1].DocID, "second
document DocID should match")
+}
+
+func TestSeriesMetadataBackwardCompatibility(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ epoch := uint64(67890)
+ path := partPath(tmpPath, epoch)
+
+ // Create a memPart with elements but without series metadata
+ mp := generateMemPart()
+ mp.mustInitFromElements(es)
+ // Don't set series metadata to simulate old parts
+
+ // Flush to disk
+ mp.mustFlush(fileSystem, path)
+
+ // Verify series metadata file does not exist by trying to read it
+ seriesMetadataPath := filepath.Join(path, seriesMetadataFilename)
+ _, err := fileSystem.Read(seriesMetadataPath)
+ assert.Error(t, err, "series metadata file should not exist for old
parts")
+
+ // Open the part - should work without series metadata (backward
compatibility)
+ p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+ defer p.close()
+
+ // Verify part can be opened successfully
+ assert.NotNil(t, p, "part should be opened successfully")
+ assert.Nil(t, p.seriesMetadata, "series metadata reader should be nil
for old parts")
+}
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 4ce74d7a..26f345f0 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -342,10 +342,10 @@ func (tst *tsTable) mustAddMemPart(mp *memPart) {
}
func (tst *tsTable) mustAddElements(es *elements) {
- tst.mustAddElementsWithSegmentID(es, 0)
+ tst.mustAddElementsWithSegmentID(es, 0, nil)
}
-func (tst *tsTable) mustAddElementsWithSegmentID(es *elements, segmentID
int64) {
+func (tst *tsTable) mustAddElementsWithSegmentID(es *elements, segmentID
int64, seriesMetadata []byte) {
if len(es.seriesIDs) == 0 {
return
}
@@ -353,6 +353,13 @@ func (tst *tsTable) mustAddElementsWithSegmentID(es
*elements, segmentID int64)
mp := generateMemPart()
mp.mustInitFromElements(es)
mp.segmentID = segmentID
+ if len(seriesMetadata) > 0 {
+ // Write series metadata to buffer to avoid sharing the
underlying slice
+ _, err := mp.seriesMetadata.Write(seriesMetadata)
+ if err != nil {
+ logger.Panicf("cannot write series metadata to buffer:
%s", err)
+ }
+ }
tst.mustAddMemPart(mp)
}
diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go
index b86f48d1..15717971 100644
--- a/banyand/stream/write_liaison.go
+++ b/banyand/stream/write_liaison.go
@@ -193,8 +193,20 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
g := groups[i]
for j := range g.tables {
es := g.tables[j]
- es.tsTable.mustAddElementsWithSegmentID(es.elements,
es.timeRange.Start.UnixNano())
- releaseElements(es.elements)
+ // Marshal series metadata for persistence in part
folder
+ var seriesMetadataBytes []byte
+ if len(es.seriesDocs.docs) > 0 {
+ var marshalErr error
+ seriesMetadataBytes, marshalErr =
es.seriesDocs.docs.Marshal()
+ if marshalErr != nil {
+
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed
to marshal series metadata for persistence")
+ // Continue without series metadata,
but log the error
+ }
+ }
+ if es.tsTable != nil && es.elements != nil {
+
es.tsTable.mustAddElementsWithSegmentID(es.elements,
es.timeRange.Start.UnixNano(), seriesMetadataBytes)
+ releaseElements(es.elements)
+ }
// Get nodes for this shard
nodes := g.queue.GetNodes(es.shardID)
if len(nodes) == 0 {
@@ -202,30 +214,25 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
continue
}
// Process series documents independently
- if len(es.seriesDocs.docs) > 0 {
- seriesDocData, marshalErr :=
es.seriesDocs.docs.Marshal()
- if marshalErr != nil {
-
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed
to marshal series documents")
- } else {
- // Encode group name, start timestamp
from timeRange, and prepend to docData
- combinedData := make([]byte, 0,
len(seriesDocData)+len(g.name)+8)
- combinedData =
encoding.EncodeBytes(combinedData, convert.StringToBytes(g.name))
- combinedData =
encoding.Int64ToBytes(combinedData, es.timeRange.Start.UnixNano())
- combinedData = append(combinedData,
seriesDocData...)
+ if len(seriesMetadataBytes) > 0 {
+ // Encode group name, start timestamp from
timeRange, and prepend to docData
+ combinedData := make([]byte, 0,
len(seriesMetadataBytes)+len(g.name)+8)
+ combinedData =
encoding.EncodeBytes(combinedData, convert.StringToBytes(g.name))
+ combinedData =
encoding.Int64ToBytes(combinedData, es.timeRange.Start.UnixNano())
+ combinedData = append(combinedData,
seriesMetadataBytes...)
- // Send to all nodes for this shard
- for _, node := range nodes {
- message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
- future, publishErr :=
w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
- if publishErr != nil {
-
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to publish series index to node")
- continue
- }
- _, err := future.Get()
- if err != nil {
-
w.l.Error().Err(err).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to get response from publish")
- continue
- }
+ // Send to all nodes for this shard
+ for _, node := range nodes {
+ message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
+ future, publishErr :=
w.tire2Client.Publish(ctx, data.TopicStreamSeriesIndexWrite, message)
+ if publishErr != nil {
+
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to publish series index to node")
+ continue
+ }
+ _, err := future.Get()
+ if err != nil {
+
w.l.Error().Err(err).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to get response from publish")
+ continue
}
}
}
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index 0e2c50be..3ed53196 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -18,6 +18,7 @@
package trace
import (
+ "errors"
"fmt"
"io"
"path"
@@ -41,12 +42,14 @@ const (
traceSpansName = "spans"
traceTagsPrefix = "t:"
traceTagMetadataPrefix = "tm:"
+ traceSeriesMetadataName = "smeta"
metadataFilename = "metadata.json"
traceIDFilterFilename = "traceID.filter"
tagTypeFilename = "tag.type"
primaryFilename = tracePrimaryName + ".bin"
metaFilename = traceMetaName + ".bin"
spansFilename = traceSpansName + ".bin"
+ seriesMetadataFilename = traceSeriesMetadataName + ".bin"
tagsMetadataFilenameExt = ".tm"
tagsFilenameExt = ".t"
)
@@ -59,6 +62,7 @@ type part struct {
tags map[string]fs.Reader
tagType tagType
traceIDFilter traceIDFilter
+ seriesMetadata fs.Reader // Optional: series metadata reader
path string
primaryBlockMetadata []primaryBlockMetadata
partMetadata partMetadata
@@ -67,6 +71,9 @@ type part struct {
func (p *part) close() {
fs.MustClose(p.primary)
fs.MustClose(p.spans)
+ if p.seriesMetadata != nil {
+ fs.MustClose(p.seriesMetadata)
+ }
for _, t := range p.tags {
fs.MustClose(t)
}
@@ -91,6 +98,9 @@ func openMemPart(mp *memPart) *part {
// Open data files
p.primary = &mp.primary
p.spans = &mp.spans
+ if len(mp.seriesMetadata.Buf) > 0 {
+ p.seriesMetadata = &mp.seriesMetadata
+ }
if mp.tags != nil {
p.tags = make(map[string]fs.Reader)
p.tagMetadata = make(map[string]fs.Reader)
@@ -103,15 +113,16 @@ func openMemPart(mp *memPart) *part {
}
type memPart struct {
- tagMetadata map[string]*bytes.Buffer
- tags map[string]*bytes.Buffer
- tagType tagType
- traceIDFilter traceIDFilter
- spans bytes.Buffer
- meta bytes.Buffer
- primary bytes.Buffer
- partMetadata partMetadata
- segmentID int64
+ tagMetadata map[string]*bytes.Buffer
+ tags map[string]*bytes.Buffer
+ tagType tagType
+ traceIDFilter traceIDFilter
+ spans bytes.Buffer
+ meta bytes.Buffer
+ primary bytes.Buffer
+ seriesMetadata bytes.Buffer
+ partMetadata partMetadata
+ segmentID int64
}
func (mp *memPart) mustCreateMemTagWriters(name string) (fs.Writer, fs.Writer)
{
@@ -142,6 +153,7 @@ func (mp *memPart) reset() {
mp.meta.Reset()
mp.primary.Reset()
mp.spans.Reset()
+ mp.seriesMetadata.Reset()
mp.segmentID = 0
if mp.tags != nil {
for k, t := range mp.tags {
@@ -284,6 +296,11 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem,
path string) {
fs.MustFlush(fileSystem, tm.Buf, filepath.Join(path,
name+tagsMetadataFilenameExt), storage.FilePerm)
}
+ // Flush series metadata if available
+ if len(mp.seriesMetadata.Buf) > 0 {
+ fs.MustFlush(fileSystem, mp.seriesMetadata.Buf,
filepath.Join(path, seriesMetadataFilename), storage.FilePerm)
+ }
+
mp.partMetadata.mustWriteMetadata(fileSystem, path)
mp.tagType.mustWriteTagType(fileSystem, path)
mp.traceIDFilter.mustWriteTraceIDFilter(fileSystem, path)
@@ -370,6 +387,20 @@ func mustOpenFilePart(id uint64, root string, fileSystem
fs.FileSystem) *part {
p.primary = mustOpenReader(path.Join(partPath, primaryFilename),
fileSystem)
p.spans = mustOpenReader(path.Join(partPath, spansFilename), fileSystem)
+
+ // Try to open series metadata file (optional, for backward
compatibility)
+ seriesMetadataPath := path.Join(partPath, seriesMetadataFilename)
+ reader, err := fileSystem.OpenFile(seriesMetadataPath)
+ if err != nil {
+ var fsErr *fs.FileSystemError
+ // File does not exist is acceptable for backward compatibility
+ if !errors.As(err, &fsErr) || fsErr.Code != fs.IsNotExistError {
+ logger.Panicf("cannot open series metadata file %q:
%s", seriesMetadataPath, err)
+ }
+ } else {
+ p.seriesMetadata = reader
+ }
+
ee := fileSystem.ReadDir(partPath)
for _, e := range ee {
if e.IsDir() {
diff --git a/banyand/trace/part_test.go b/banyand/trace/part_test.go
index eb2fa840..f5c7a4ea 100644
--- a/banyand/trace/part_test.go
+++ b/banyand/trace/part_test.go
@@ -18,6 +18,7 @@
package trace
import (
+ "path/filepath"
"testing"
"github.com/stretchr/testify/assert"
@@ -25,6 +26,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/test"
)
@@ -263,3 +265,122 @@ func TestMustInitFromPart(t *testing.T) {
assert.Equal(t, originalBuffer.Buf, newBuffer.Buf)
}
}
+
+func TestSeriesMetadataPersistence(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ epoch := uint64(12345)
+ path := partPath(tmpPath, epoch)
+
+ // Create a memPart with traces
+ mp := generateMemPart()
+ mp.mustInitFromTraces(ts)
+
+ // Create sample series metadata using NewBytesField
+ field1 := index.NewBytesField(index.FieldKey{
+ IndexRuleID: 1,
+ Analyzer: "keyword",
+ TagName: "tag1",
+ }, []byte("term1"))
+ field2 := index.NewBytesField(index.FieldKey{
+ IndexRuleID: 2,
+ Analyzer: "keyword",
+ TagName: "tag2",
+ }, []byte("term2"))
+ metadataDocs := index.Documents{
+ {
+ DocID: 1,
+ EntityValues: []byte("entity1"),
+ Fields: []index.Field{field1},
+ },
+ {
+ DocID: 2,
+ EntityValues: []byte("entity2"),
+ Fields: []index.Field{field2},
+ },
+ }
+
+ // Marshal series metadata
+ seriesMetadataBytes, err := metadataDocs.Marshal()
+ require.NoError(t, err)
+ require.NotEmpty(t, seriesMetadataBytes)
+
+ // Set series metadata in memPart
+ _, err = mp.seriesMetadata.Write(seriesMetadataBytes)
+ require.NoError(t, err)
+
+ // Flush to disk
+ mp.mustFlush(fileSystem, path)
+
+ // Verify series metadata file exists by trying to read it
+ seriesMetadataPath := filepath.Join(path, seriesMetadataFilename)
+ readBytes, err := fileSystem.Read(seriesMetadataPath)
+ require.NoError(t, err, "series metadata file should exist")
+ assert.Equal(t, seriesMetadataBytes, readBytes, "series metadata
content should match")
+
+ // Open the part and verify series metadata is accessible
+ p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+ defer p.close()
+
+ // Verify series metadata reader is available
+ assert.NotNil(t, p.seriesMetadata, "series metadata reader should be
available")
+
+ // Read and unmarshal series metadata using SequentialRead
+ seqReader := p.seriesMetadata.SequentialRead()
+ defer seqReader.Close()
+ readMetadataBytes := make([]byte, 0)
+ buf := make([]byte, 1024)
+ for {
+ var n int
+ n, err = seqReader.Read(buf)
+ if n == 0 {
+ if err != nil {
+ break
+ }
+ continue
+ }
+ if err != nil {
+ break
+ }
+ readMetadataBytes = append(readMetadataBytes, buf[:n]...)
+ }
+
+ var readDocs index.Documents
+ err = readDocs.Unmarshal(readMetadataBytes)
+ require.NoError(t, err)
+ assert.Equal(t, len(metadataDocs), len(readDocs), "number of documents
should match")
+ assert.Equal(t, metadataDocs[0].DocID, readDocs[0].DocID, "first
document DocID should match")
+ assert.Equal(t, metadataDocs[1].DocID, readDocs[1].DocID, "second
document DocID should match")
+}
+
+func TestSeriesMetadataBackwardCompatibility(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ epoch := uint64(67890)
+ path := partPath(tmpPath, epoch)
+
+ // Create a memPart with traces but without series metadata
+ mp := generateMemPart()
+ mp.mustInitFromTraces(ts)
+ // Don't set series metadata to simulate old parts
+
+ // Flush to disk
+ mp.mustFlush(fileSystem, path)
+
+ // Verify series metadata file does not exist by trying to read it
+ seriesMetadataPath := filepath.Join(path, seriesMetadataFilename)
+ _, err := fileSystem.Read(seriesMetadataPath)
+ assert.Error(t, err, "series metadata file should not exist for old
parts")
+
+ // Open the part - should work without series metadata (backward
compatibility)
+ p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+ defer p.close()
+
+ // Verify part can be opened successfully
+ assert.NotNil(t, p, "part should be opened successfully")
+ assert.Nil(t, p.seriesMetadata, "series metadata reader should be nil
for old parts")
+}
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 287b17a8..6d168568 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -486,10 +486,10 @@ func (tst *tsTable) mustAddMemPart(mp *memPart,
sidxReqsMap map[string]*sidx.Mem
}
func (tst *tsTable) mustAddTraces(ts *traces, sidxReqsMap
map[string]*sidx.MemPart) {
- tst.mustAddTracesWithSegmentID(ts, 0, sidxReqsMap)
+ tst.mustAddTracesWithSegmentID(ts, 0, sidxReqsMap, nil)
}
-func (tst *tsTable) mustAddTracesWithSegmentID(ts *traces, segmentID int64,
sidxReqsMap map[string]*sidx.MemPart) {
+func (tst *tsTable) mustAddTracesWithSegmentID(ts *traces, segmentID int64,
sidxReqsMap map[string]*sidx.MemPart, seriesMetadata []byte) {
if len(ts.traceIDs) == 0 {
return
}
@@ -497,6 +497,13 @@ func (tst *tsTable) mustAddTracesWithSegmentID(ts *traces,
segmentID int64, sidx
mp := generateMemPart()
mp.mustInitFromTraces(ts)
mp.segmentID = segmentID
+ if len(seriesMetadata) > 0 {
+ // Write series metadata to buffer to avoid sharing the
underlying slice
+ _, err := mp.seriesMetadata.Write(seriesMetadata)
+ if err != nil {
+ logger.Panicf("cannot write series metadata to buffer:
%s", err)
+ }
+ }
tst.mustAddMemPart(mp, sidxReqsMap)
}
diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go
index a234cfab..9660e72f 100644
--- a/banyand/trace/write_liaison.go
+++ b/banyand/trace/write_liaison.go
@@ -204,6 +204,16 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
g := groups[i]
for j := range g.tables {
es := g.tables[j]
+ // Marshal series metadata for persistence in part
folder
+ var seriesMetadataBytes []byte
+ if len(es.seriesDocs.docs) > 0 {
+ var marshalErr error
+ seriesMetadataBytes, marshalErr =
es.seriesDocs.docs.Marshal()
+ if marshalErr != nil {
+
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed
to marshal series metadata for persistence")
+ // Continue without series metadata,
but log the error
+ }
+ }
var sidxMemPartMap map[string]*sidx.MemPart
for sidxName, sidxReqs := range es.sidxReqsMap {
if len(sidxReqs) > 0 {
@@ -223,8 +233,10 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
sidxMemPartMap[sidxName] = siMemPart
}
}
- es.tsTable.mustAddTracesWithSegmentID(es.traces,
es.timeRange.Start.UnixNano(), sidxMemPartMap)
- releaseTraces(es.traces)
+ if es.tsTable != nil && es.traces != nil {
+
es.tsTable.mustAddTracesWithSegmentID(es.traces, es.timeRange.Start.UnixNano(),
sidxMemPartMap, seriesMetadataBytes)
+ releaseTraces(es.traces)
+ }
nodes := g.queue.GetNodes(es.shardID)
if len(nodes) == 0 {
@@ -233,24 +245,19 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
}
// Handle series index writing
- if len(es.seriesDocs.docs) > 0 {
- seriesDocData, marshalErr :=
es.seriesDocs.docs.Marshal()
- if marshalErr != nil {
-
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed
to marshal series documents")
- } else {
- // Encode group name, start timestamp
from timeRange, and prepend to docData
- combinedData := make([]byte, 0,
len(seriesDocData)+len(g.name)+8)
- combinedData =
encoding.EncodeBytes(combinedData, convert.StringToBytes(g.name))
- combinedData =
encoding.Int64ToBytes(combinedData, es.timeRange.Start.UnixNano())
- combinedData = append(combinedData,
seriesDocData...)
+ if len(seriesMetadataBytes) > 0 {
+ // Encode group name, start timestamp from
timeRange, and prepend to docData
+ combinedData := make([]byte, 0,
len(seriesMetadataBytes)+len(g.name)+8)
+ combinedData =
encoding.EncodeBytes(combinedData, convert.StringToBytes(g.name))
+ combinedData =
encoding.Int64ToBytes(combinedData, es.timeRange.Start.UnixNano())
+ combinedData = append(combinedData,
seriesMetadataBytes...)
- // Send to all nodes for this shard
- for _, node := range nodes {
- message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
- _, publishErr :=
w.tire2Client.Publish(ctx, data.TopicTraceSidxSeriesWrite, message)
- if publishErr != nil {
-
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to publish series index to node")
- }
+ // Send to all nodes for this shard
+ for _, node := range nodes {
+ message :=
bus.NewMessageWithNode(bus.MessageID(time.Now().UnixNano()), node, combinedData)
+ _, publishErr :=
w.tire2Client.Publish(ctx, data.TopicTraceSidxSeriesWrite, message)
+ if publishErr != nil {
+
w.l.Error().Err(publishErr).Str("node", node).Uint32("shardID",
uint32(es.shardID)).Msg("failed to publish series index to node")
}
}
}