This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 2bf77e13 Measure Series Metadata in Liaison Sending Queue (#890)
2bf77e13 is described below
commit 2bf77e13ee58cda75bf28b999187a3cb980b93bf
Author: OmCheeLin <[email protected]>
AuthorDate: Mon Dec 15 08:57:05 2025 +0800
Measure Series Metadata in Liaison Sending Queue (#890)
---------
Co-authored-by: Gao Hongtao <[email protected]>
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
Co-authored-by: Copilot <[email protected]>
---
banyand/measure/part.go | 40 +++++++++++--
banyand/measure/part_test.go | 121 +++++++++++++++++++++++++++++++++++++++
banyand/measure/tstable.go | 11 +++-
banyand/measure/write_liaison.go | 31 ++++++----
4 files changed, 184 insertions(+), 19 deletions(-)
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index e47bfbc6..7cb29232 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -18,6 +18,7 @@
package measure
import (
+ "errors"
"fmt"
"path"
"path/filepath"
@@ -35,18 +36,20 @@ import (
const (
// Streaming file names for measure data parts (without extensions).
- measurePrimaryName = "primary"
- measureMetaName = "meta"
- measureTimestampsName = "timestamps"
- measureFieldValuesName = "fv"
- measureTagFamiliesPrefix = "tf:"
- measureTagMetadataPrefix = "tfm:"
+ measurePrimaryName = "primary"
+ measureMetaName = "meta"
+ measureTimestampsName = "timestamps"
+ measureFieldValuesName = "fv"
+ measureTagFamiliesPrefix = "tf:"
+ measureTagMetadataPrefix = "tfm:"
+ measureSeriesMetadataName = "smeta"
metadataFilename = "metadata.json"
primaryFilename = measurePrimaryName + ".bin"
metaFilename = measureMetaName + ".bin"
timestampsFilename = measureTimestampsName + ".bin"
fieldValuesFilename = measureFieldValuesName + ".bin"
+ seriesMetadataFilename = measureSeriesMetadataName + ".bin"
tagFamiliesMetadataFilenameExt = ".tfm"
tagFamiliesFilenameExt = ".tf"
)
@@ -58,6 +61,7 @@ type part struct {
fileSystem fs.FileSystem
tagFamilyMetadata map[string]fs.Reader
tagFamilies map[string]fs.Reader
+ seriesMetadata fs.Reader // Optional: series metadata reader
cache storage.Cache
path string
primaryBlockMetadata []primaryBlockMetadata
@@ -68,6 +72,9 @@ func (p *part) close() {
fs.MustClose(p.primary)
fs.MustClose(p.timestamps)
fs.MustClose(p.fieldValues)
+ if p.seriesMetadata != nil {
+ fs.MustClose(p.seriesMetadata)
+ }
for _, tf := range p.tagFamilies {
fs.MustClose(tf)
}
@@ -108,6 +115,7 @@ type memPart struct {
primary bytes.Buffer
timestamps bytes.Buffer
fieldValues bytes.Buffer
+ seriesMetadata bytes.Buffer
partMetadata partMetadata
segmentID int64
}
@@ -135,6 +143,7 @@ func (mp *memPart) reset() {
mp.primary.Reset()
mp.timestamps.Reset()
mp.fieldValues.Reset()
+ mp.seriesMetadata.Reset()
if mp.tagFamilies != nil {
for k, tf := range mp.tagFamilies {
tf.Reset()
@@ -208,6 +217,11 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem,
path string) {
fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path,
name+tagFamiliesMetadataFilenameExt), storage.FilePerm)
}
+ // Flush series metadata if available
+ if len(mp.seriesMetadata.Buf) > 0 {
+ fs.MustFlush(fileSystem, mp.seriesMetadata.Buf,
filepath.Join(path, seriesMetadataFilename), storage.FilePerm)
+ }
+
mp.partMetadata.mustWriteMetadata(fileSystem, path)
fileSystem.SyncPath(path)
@@ -306,6 +320,20 @@ func mustOpenFilePart(id uint64, root string, fileSystem
fs.FileSystem) *part {
p.primary = mustOpenReader(path.Join(partPath, primaryFilename),
fileSystem)
p.timestamps = mustOpenReader(path.Join(partPath, timestampsFilename),
fileSystem)
p.fieldValues = mustOpenReader(path.Join(partPath,
fieldValuesFilename), fileSystem)
+
+ // Try to open series metadata file (optional, for backward
compatibility)
+ seriesMetadataPath := path.Join(partPath, seriesMetadataFilename)
+ reader, err := fileSystem.OpenFile(seriesMetadataPath)
+ if err != nil {
+ var fsErr *fs.FileSystemError
+ // File does not exist is acceptable for backward compatibility
+ if !errors.As(err, &fsErr) || fsErr.Code != fs.IsNotExistError {
+ logger.Panicf("cannot open series metadata file %q:
%s", seriesMetadataPath, err)
+ }
+ } else {
+ p.seriesMetadata = reader
+ }
+
ee := fileSystem.ReadDir(partPath)
for _, e := range ee {
if e.IsDir() {
diff --git a/banyand/measure/part_test.go b/banyand/measure/part_test.go
index ffd72bae..026d6956 100644
--- a/banyand/measure/part_test.go
+++ b/banyand/measure/part_test.go
@@ -18,6 +18,7 @@
package measure
import (
+ "path/filepath"
"testing"
"github.com/stretchr/testify/assert"
@@ -26,6 +27,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/test"
)
@@ -218,3 +220,122 @@ var dps = &dataPoints{
},
},
}
+
+func TestSeriesMetadataPersistence(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ epoch := uint64(12345)
+ path := partPath(tmpPath, epoch)
+
+ // Create a memPart with data points
+ mp := generateMemPart()
+ mp.mustInitFromDataPoints(dps)
+
+ // Create sample series metadata using NewBytesField
+ field1 := index.NewBytesField(index.FieldKey{
+ IndexRuleID: 1,
+ Analyzer: "keyword",
+ TagName: "tag1",
+ }, []byte("term1"))
+ field2 := index.NewBytesField(index.FieldKey{
+ IndexRuleID: 2,
+ Analyzer: "keyword",
+ TagName: "tag2",
+ }, []byte("term2"))
+ metadataDocs := index.Documents{
+ {
+ DocID: 1,
+ EntityValues: []byte("entity1"),
+ Fields: []index.Field{field1},
+ },
+ {
+ DocID: 2,
+ EntityValues: []byte("entity2"),
+ Fields: []index.Field{field2},
+ },
+ }
+
+ // Marshal series metadata
+ seriesMetadataBytes, err := metadataDocs.Marshal()
+ require.NoError(t, err)
+ require.NotEmpty(t, seriesMetadataBytes)
+
+ // Set series metadata in memPart
+ _, err = mp.seriesMetadata.Write(seriesMetadataBytes)
+ require.NoError(t, err)
+
+ // Flush to disk
+ mp.mustFlush(fileSystem, path)
+
+ // Verify series metadata file exists by trying to read it
+ seriesMetadataPath := filepath.Join(path, seriesMetadataFilename)
+ readBytes, err := fileSystem.Read(seriesMetadataPath)
+ require.NoError(t, err, "series metadata file should exist")
+ assert.Equal(t, seriesMetadataBytes, readBytes, "series metadata
content should match")
+
+ // Open the part and verify series metadata is accessible
+ p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+ defer p.close()
+
+ // Verify series metadata reader is available
+ assert.NotNil(t, p.seriesMetadata, "series metadata reader should be
available")
+
+ // Read and unmarshal series metadata using SequentialRead
+ seqReader := p.seriesMetadata.SequentialRead()
+ defer seqReader.Close()
+ readMetadataBytes := make([]byte, 0)
+ buf := make([]byte, 1024)
+ for {
+ var n int
+ n, err = seqReader.Read(buf)
+ if n == 0 {
+ if err != nil {
+ break
+ }
+ continue
+ }
+ if err != nil {
+ break
+ }
+ readMetadataBytes = append(readMetadataBytes, buf[:n]...)
+ }
+
+ var readDocs index.Documents
+ err = readDocs.Unmarshal(readMetadataBytes)
+ require.NoError(t, err)
+ assert.Equal(t, len(metadataDocs), len(readDocs), "number of documents
should match")
+ assert.Equal(t, metadataDocs[0].DocID, readDocs[0].DocID, "first
document DocID should match")
+ assert.Equal(t, metadataDocs[1].DocID, readDocs[1].DocID, "second
document DocID should match")
+}
+
+func TestSeriesMetadataBackwardCompatibility(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+ epoch := uint64(67890)
+ path := partPath(tmpPath, epoch)
+
+ // Create a memPart with data points but without series metadata
+ mp := generateMemPart()
+ mp.mustInitFromDataPoints(dps)
+ // Don't set series metadata to simulate old parts
+
+ // Flush to disk
+ mp.mustFlush(fileSystem, path)
+
+ // Verify series metadata file does not exist by trying to read it
+ seriesMetadataPath := filepath.Join(path, seriesMetadataFilename)
+ _, err := fileSystem.Read(seriesMetadataPath)
+ assert.Error(t, err, "series metadata file should not exist for old
parts")
+
+ // Open the part - should work without series metadata (backward
compatibility)
+ p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+ defer p.close()
+
+ // Verify part can be opened successfully
+ assert.NotNil(t, p, "part should be opened successfully")
+ assert.Nil(t, p.seriesMetadata, "series metadata reader should be nil
for old parts")
+}
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 2e59e1e6..683327e7 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -285,10 +285,10 @@ func (tst *tsTable) Close() error {
}
func (tst *tsTable) mustAddDataPoints(dps *dataPoints) {
- tst.mustAddDataPointsWithSegmentID(dps, 0)
+ tst.mustAddDataPointsWithSegmentID(dps, 0, nil)
}
-func (tst *tsTable) mustAddDataPointsWithSegmentID(dps *dataPoints, segmentID
int64) {
+func (tst *tsTable) mustAddDataPointsWithSegmentID(dps *dataPoints, segmentID
int64, seriesMetadata []byte) {
if len(dps.seriesIDs) == 0 {
return
}
@@ -296,6 +296,13 @@ func (tst *tsTable) mustAddDataPointsWithSegmentID(dps
*dataPoints, segmentID in
mp := generateMemPart()
mp.mustInitFromDataPoints(dps)
mp.segmentID = segmentID
+ if len(seriesMetadata) > 0 {
+ // Write series metadata to buffer to avoid sharing the
underlying slice
+ _, err := mp.seriesMetadata.Write(seriesMetadata)
+ if err != nil {
+ logger.Panicf("cannot write series metadata to buffer:
%s", err)
+ }
+ }
tst.mustAddMemPart(mp)
}
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index 867eb840..e870b8c9 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -35,7 +35,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
- "github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -117,8 +116,18 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
g := groups[i]
for j := range g.tables {
es := g.tables[j]
+ // Marshal series metadata for persistence in part
folder
+ var seriesMetadataBytes []byte
+ if len(es.metadataDocs) > 0 {
+ var marshalErr error
+ seriesMetadataBytes, marshalErr =
es.metadataDocs.Marshal()
+ if marshalErr != nil {
+
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed
to marshal series metadata for persistence")
+ // Continue without series metadata,
but log the error
+ }
+ }
if es.tsTable != nil && es.dataPoints != nil {
-
es.tsTable.mustAddDataPointsWithSegmentID(es.dataPoints,
es.timeRange.Start.UnixNano())
+
es.tsTable.mustAddDataPointsWithSegmentID(es.dataPoints,
es.timeRange.Start.UnixNano(), seriesMetadataBytes)
releaseDataPoints(es.dataPoints)
}
nodes := g.queue.GetNodes(es.shardID)
@@ -126,12 +135,7 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
w.l.Warn().Uint32("shardID",
uint32(es.shardID)).Msg("no nodes found for shard")
continue
}
- sendDocuments := func(topic bus.Topic, docs
index.Documents) {
- seriesDocData, marshalErr := docs.Marshal()
- if marshalErr != nil {
-
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed
to marshal series documents")
- return
- }
+ sendDocuments := func(topic bus.Topic, seriesDocData
[]byte) {
// Encode group name, start timestamp from
timeRange, and prepend to docData
combinedData := make([]byte, 0,
len(seriesDocData)+len(g.name)+8)
combinedData =
encoding.EncodeBytes(combinedData, convert.StringToBytes(g.name))
@@ -153,11 +157,16 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
}
}
}
- if len(es.metadataDocs) > 0 {
-
sendDocuments(data.TopicMeasureSeriesIndexInsert, es.metadataDocs)
+ if len(seriesMetadataBytes) > 0 {
+
sendDocuments(data.TopicMeasureSeriesIndexInsert, seriesMetadataBytes)
}
if len(es.indexModeDocs) > 0 {
-
sendDocuments(data.TopicMeasureSeriesIndexUpdate, es.indexModeDocs)
+ seriesDocData, marshalErr :=
es.indexModeDocs.Marshal()
+ if marshalErr != nil {
+
w.l.Error().Err(marshalErr).Uint32("shardID", uint32(es.shardID)).Msg("failed
to marshal index mode documents")
+ } else {
+
sendDocuments(data.TopicMeasureSeriesIndexUpdate, seriesDocData)
+ }
}
}
}