This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 62bf150c update the dump tool to support analyzing the parts with
smeta files (#900)
62bf150c is described below
commit 62bf150c70167f81285e9e2c6b621cf55c6bedd8
Author: OmCheeLin <[email protected]>
AuthorDate: Mon Dec 29 10:19:35 2025 +0800
update the dump tool to support analyzing the parts with smeta files (#900)
---
CHANGES.md | 1 +
banyand/cmd/dump/measure.go | 77 +++++++++++++++++++++++++++-----
banyand/cmd/dump/measure_test.go | 93 +++++++++++++++++++++++++++++++++++++++
banyand/cmd/dump/stream.go | 76 +++++++++++++++++++++++++++-----
banyand/cmd/dump/stream_test.go | 94 ++++++++++++++++++++++++++++++++++++++++
banyand/cmd/dump/trace.go | 77 +++++++++++++++++++++++++++-----
banyand/cmd/dump/trace_test.go | 92 +++++++++++++++++++++++++++++++++++++++
pkg/dump/dump.go | 88 +++++++++++++++++++++++++++++++++++++
8 files changed, 567 insertions(+), 31 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index bb5b4f96..a80ea12c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,7 @@ Release Notes.
- Remove the windows arch for binary and docker image.
- Support writing data with specifications.
- Persist series metadata in liaison queue for measure, stream and trace
models.
+- Update the dump tool to support analyzing the parts with smeta files.
### Bug Fixes
diff --git a/banyand/cmd/dump/measure.go b/banyand/cmd/dump/measure.go
index 4167d3cf..55000407 100644
--- a/banyand/cmd/dump/measure.go
+++ b/banyand/cmd/dump/measure.go
@@ -39,6 +39,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/dump"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
@@ -207,6 +208,7 @@ type measurePart struct {
fileSystem fs.FileSystem
tagFamilyMetadata map[string]fs.Reader
tagFamilies map[string]fs.Reader
+ seriesMetadata fs.Reader // Optional: series metadata reader
path string
primaryBlockMetadata []measurePrimaryBlockMetadata
partMetadata measurePartMetadata
@@ -226,6 +228,7 @@ type measureDumpContext struct {
tagFilter logical.TagFilter
fileSystem fs.FileSystem
seriesMap map[common.SeriesID]string
+ partSeriesMap map[uint64]map[common.SeriesID]string // partID ->
SeriesID -> EntityValues from smeta.bin
writer *csv.Writer
opts measureDumpOptions
partIDs []uint64
@@ -238,8 +241,9 @@ type measureDumpContext struct {
func newMeasureDumpContext(opts measureDumpOptions) (*measureDumpContext,
error) {
ctx := &measureDumpContext{
- opts: opts,
- fileSystem: fs.NewLocalFileSystem(),
+ opts: opts,
+ fileSystem: fs.NewLocalFileSystem(),
+ partSeriesMap: make(map[uint64]map[common.SeriesID]string),
}
partIDs, err := discoverMeasurePartIDs(opts.shardPath)
@@ -363,6 +367,13 @@ func (ctx *measureDumpContext) processParts() error {
}
func (ctx *measureDumpContext) processPart(partID uint64, p *measurePart)
(int, error) {
+ // Parse and display series metadata if available
+ if p.seriesMetadata != nil {
+ if err := ctx.parseAndDisplaySeriesMetadata(partID, p); err !=
nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error parsing series
metadata in part %016x: %v\n", partID, err)
+ }
+ }
+
decoder := &encoding.BytesBlockDecoder{}
partRowCount := 0
@@ -523,11 +534,11 @@ func (ctx *measureDumpContext) shouldSkip(tags
map[string][]byte) bool {
func (ctx *measureDumpContext) writeRow(row measureRowData) error {
if ctx.opts.csvOutput {
- if err := writeMeasureRowAsCSV(ctx.writer, row,
ctx.fieldColumns, ctx.tagColumns, ctx.seriesMap); err != nil {
+ if err := writeMeasureRowAsCSV(ctx.writer, row,
ctx.fieldColumns, ctx.tagColumns, ctx.seriesMap, ctx.partSeriesMap); err != nil
{
return err
}
} else {
- writeMeasureRowAsText(row, ctx.rowNum+1, ctx.projectionTags,
ctx.projectionFields, ctx.seriesMap)
+ writeMeasureRowAsText(row, ctx.rowNum+1, ctx.projectionTags,
ctx.projectionFields, ctx.seriesMap, ctx.partSeriesMap)
}
ctx.rowNum++
return nil
@@ -589,6 +600,9 @@ func openMeasurePart(id uint64, root string, fileSystem
fs.FileSystem) (*measure
return nil, fmt.Errorf("cannot open fv.bin: %w", err)
}
+ // Try to open series metadata file (optional, for backward
compatibility)
+ p.seriesMetadata = dump.TryOpenSeriesMetadata(fileSystem, partPath)
+
// Open tag family files
entries := fileSystem.ReadDir(partPath)
p.tagFamilies = make(map[string]fs.Reader)
@@ -627,6 +641,9 @@ func closeMeasurePart(p *measurePart) {
if p.fieldValues != nil {
fs.MustClose(p.fieldValues)
}
+ if p.seriesMetadata != nil {
+ fs.MustClose(p.seriesMetadata)
+ }
for _, r := range p.tagFamilies {
fs.MustClose(r)
}
@@ -1084,18 +1101,38 @@ func discoverMeasureColumns(partIDs []uint64, shardPath
string, fileSystem fs.Fi
return tagResult, fieldResult, nil
}
-func writeMeasureRowAsText(row measureRowData, rowNum int, projectionTags
[]string, projectionFields []string, seriesMap map[common.SeriesID]string) {
+func writeMeasureRowAsText(
+ row measureRowData,
+ rowNum int,
+ projectionTags []string,
+ projectionFields []string,
+ seriesMap map[common.SeriesID]string,
+ partSeriesMap map[uint64]map[common.SeriesID]string,
+) {
fmt.Printf("Row %d:\n", rowNum)
fmt.Printf(" PartID: %d (0x%016x)\n", row.partID, row.partID)
fmt.Printf(" Timestamp: %s\n", formatTimestamp(row.timestamp))
fmt.Printf(" Version: %d\n", row.version)
fmt.Printf(" SeriesID: %d\n", row.seriesID)
- if seriesMap != nil {
- if seriesText, ok := seriesMap[row.seriesID]; ok {
- fmt.Printf(" Series: %s\n", seriesText)
+ seriesText := ""
+ // First try to get EntityValues from smeta.bin (part-level)
+ if partSeriesMap != nil {
+ if partMap, ok := partSeriesMap[row.partID]; ok {
+ if text, ok := partMap[row.seriesID]; ok {
+ seriesText = text
+ }
}
}
+ // Fallback to segment-level seriesMap if not found in smeta
+ if seriesText == "" && seriesMap != nil {
+ if text, ok := seriesMap[row.seriesID]; ok {
+ seriesText = text
+ }
+ }
+ if seriesText != "" {
+ fmt.Printf(" Series: %s\n", seriesText)
+ }
if len(row.fields) > 0 {
fmt.Printf(" Fields:\n")
@@ -1151,9 +1188,25 @@ func writeMeasureRowAsText(row measureRowData, rowNum
int, projectionTags []stri
fmt.Printf("\n")
}
-func writeMeasureRowAsCSV(writer *csv.Writer, row measureRowData, fieldColumns
[]string, tagColumns []string, seriesMap map[common.SeriesID]string) error {
+func writeMeasureRowAsCSV(
+ writer *csv.Writer,
+ row measureRowData,
+ fieldColumns []string,
+ tagColumns []string,
+ seriesMap map[common.SeriesID]string,
+ partSeriesMap map[uint64]map[common.SeriesID]string,
+) error {
seriesText := ""
- if seriesMap != nil {
+ // First try to get EntityValues from smeta.bin (part-level)
+ if partSeriesMap != nil {
+ if partMap, ok := partSeriesMap[row.partID]; ok {
+ if text, ok := partMap[row.seriesID]; ok {
+ seriesText = text
+ }
+ }
+ }
+ // Fallback to segment-level seriesMap if not found in smeta
+ if seriesText == "" && seriesMap != nil {
if text, ok := seriesMap[row.seriesID]; ok {
seriesText = text
}
@@ -1294,3 +1347,7 @@ func measureTagValueDecoder(valueType pbv1.ValueType,
value []byte, valueArr [][
return pbv1.NullTagValue
}
}
+
+func (ctx *measureDumpContext) parseAndDisplaySeriesMetadata(partID uint64, p
*measurePart) error {
+ return dump.ParseSeriesMetadata(partID, p.seriesMetadata,
ctx.partSeriesMap)
+}
diff --git a/banyand/cmd/dump/measure_test.go b/banyand/cmd/dump/measure_test.go
index fa1519e6..cf207593 100644
--- a/banyand/cmd/dump/measure_test.go
+++ b/banyand/cmd/dump/measure_test.go
@@ -18,6 +18,7 @@
package main
import (
+ "fmt"
"path/filepath"
"strconv"
"testing"
@@ -25,10 +26,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/test"
)
@@ -158,3 +163,91 @@ func TestDumpMeasurePartFormat(t *testing.T) {
t.Logf("Successfully parsed part with %d data points across %d primary
blocks (metadata BlocksCount=%d)",
totalDataPoints, len(p.primaryBlockMetadata),
p.partMetadata.BlocksCount)
}
+
+// TestDumpMeasurePartWithSeriesMetadata tests that the dump tool can parse
smeta file.
+// This test creates a part with series metadata and verifies the dump tool
can correctly parse it.
+func TestDumpMeasurePartWithSeriesMetadata(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Create a part with series metadata
+ partPath, cleanup := createTestMeasurePartWithSeriesMetadata(tmpPath,
fileSystem)
+ defer cleanup()
+
+ // Extract part ID from path
+ partName := filepath.Base(partPath)
+ partID, err := strconv.ParseUint(partName, 16, 64)
+ require.NoError(t, err, "part directory should have valid hex name")
+
+ // Parse the part using dump tool functions
+ p, err := openMeasurePart(partID, filepath.Dir(partPath), fileSystem)
+ require.NoError(t, err, "should be able to open part with series
metadata")
+ defer closeMeasurePart(p)
+
+ // Verify series metadata reader is available
+ assert.NotNil(t, p.seriesMetadata, "series metadata reader should be
available")
+
+ // Create a dump context to test parsing
+ opts := measureDumpOptions{
+ shardPath: filepath.Dir(partPath),
+ segmentPath: tmpPath, // Not used for this test
+ verbose: false,
+ csvOutput: false,
+ }
+ ctx, err := newMeasureDumpContext(opts)
+ require.NoError(t, err)
+ if ctx != nil {
+ defer ctx.close()
+ }
+
+ // Test parsing series metadata
+ err = ctx.parseAndDisplaySeriesMetadata(partID, p)
+ require.NoError(t, err, "should be able to parse series metadata")
+
+ // Verify EntityValues are stored in partSeriesMap
+ require.NotNil(t, ctx.partSeriesMap, "partSeriesMap should be
initialized")
+ partMap, exists := ctx.partSeriesMap[partID]
+ require.True(t, exists, "partSeriesMap should contain entry for partID")
+ require.NotNil(t, partMap, "partMap should not be nil")
+
+ // Verify EntityValues are correctly stored
+ // Calculate expected SeriesIDs from EntityValues
+ expectedSeriesID1 :=
common.SeriesID(convert.Hash([]byte("service.name=test-service")))
+ expectedSeriesID2 :=
common.SeriesID(convert.Hash([]byte("service.name=another-service")))
+
+ assert.Contains(t, partMap, expectedSeriesID1, "partMap should contain
first series")
+ assert.Contains(t, partMap, expectedSeriesID2, "partMap should contain
second series")
+ assert.Equal(t, "service.name=test-service",
partMap[expectedSeriesID1], "EntityValues should match")
+ assert.Equal(t, "service.name=another-service",
partMap[expectedSeriesID2], "EntityValues should match")
+}
+
+// createTestMeasurePartWithSeriesMetadata creates a test measure part with
series metadata.
+func createTestMeasurePartWithSeriesMetadata(tmpPath string, fileSystem
fs.FileSystem) (string, func()) {
+ // Use measure package to create a part
+ partPath, cleanup := measure.CreateTestPartForDump(tmpPath, fileSystem)
+
+ // Create sample series metadata file
+ seriesMetadataPath := filepath.Join(partPath, "smeta.bin")
+
+ // Create sample documents
+ docs := index.Documents{
+ {
+ DocID: 1,
+ EntityValues: []byte("service.name=test-service"),
+ },
+ {
+ DocID: 2,
+ EntityValues: []byte("service.name=another-service"),
+ },
+ }
+
+ seriesMetadataBytes, err := docs.Marshal()
+ if err != nil {
+ panic(fmt.Sprintf("failed to marshal series metadata documents:
%v", err))
+ }
+ fs.MustFlush(fileSystem, seriesMetadataBytes, seriesMetadataPath,
storage.FilePerm)
+
+ return partPath, cleanup
+}
diff --git a/banyand/cmd/dump/stream.go b/banyand/cmd/dump/stream.go
index 3f9a7b37..8b74482d 100644
--- a/banyand/cmd/dump/stream.go
+++ b/banyand/cmd/dump/stream.go
@@ -39,6 +39,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/dump"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
@@ -187,6 +188,7 @@ type streamPart struct {
tagFamilyMetadata map[string]fs.Reader
tagFamilies map[string]fs.Reader
tagFamilyFilter map[string]fs.Reader
+ seriesMetadata fs.Reader // Optional: series metadata reader
path string
primaryBlockMetadata []streamPrimaryBlockMetadata
partMetadata streamPartMetadata
@@ -205,6 +207,7 @@ type streamDumpContext struct {
tagFilter logical.TagFilter
fileSystem fs.FileSystem
seriesMap map[common.SeriesID]string
+ partSeriesMap map[uint64]map[common.SeriesID]string // partID ->
SeriesID -> EntityValues from smeta.bin
writer *csv.Writer
opts streamDumpOptions
partIDs []uint64
@@ -215,8 +218,9 @@ type streamDumpContext struct {
func newStreamDumpContext(opts streamDumpOptions) (*streamDumpContext, error) {
ctx := &streamDumpContext{
- opts: opts,
- fileSystem: fs.NewLocalFileSystem(),
+ opts: opts,
+ fileSystem: fs.NewLocalFileSystem(),
+ partSeriesMap: make(map[uint64]map[common.SeriesID]string),
}
partIDs, err := discoverStreamPartIDs(opts.shardPath)
@@ -318,6 +322,13 @@ func (ctx *streamDumpContext) processParts() error {
}
func (ctx *streamDumpContext) processPart(partID uint64, p *streamPart) (int,
error) {
+ // Parse and display series metadata if available
+ if p.seriesMetadata != nil {
+ if err := ctx.parseAndDisplaySeriesMetadata(partID, p); err !=
nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error parsing series
metadata in part %016x: %v\n", partID, err)
+ }
+ }
+
decoder := &encoding.BytesBlockDecoder{}
partRowCount := 0
@@ -457,11 +468,11 @@ func (ctx *streamDumpContext) shouldSkip(tags
map[string][]byte) bool {
func (ctx *streamDumpContext) writeRow(row streamRowData) error {
if ctx.opts.csvOutput {
- if err := writeStreamRowAsCSV(ctx.writer, row, ctx.tagColumns,
ctx.seriesMap); err != nil {
+ if err := writeStreamRowAsCSV(ctx.writer, row, ctx.tagColumns,
ctx.seriesMap, ctx.partSeriesMap); err != nil {
return err
}
} else {
- writeStreamRowAsText(row, ctx.rowNum+1, ctx.opts.verbose,
ctx.projectionTags, ctx.seriesMap)
+ writeStreamRowAsText(row, ctx.rowNum+1, ctx.opts.verbose,
ctx.projectionTags, ctx.seriesMap, ctx.partSeriesMap)
}
ctx.rowNum++
return nil
@@ -516,6 +527,9 @@ func openStreamPart(id uint64, root string, fileSystem
fs.FileSystem) (*streamPa
return nil, fmt.Errorf("cannot open timestamps.bin: %w", err)
}
+ // Try to open series metadata file (optional, for backward
compatibility)
+ p.seriesMetadata = dump.TryOpenSeriesMetadata(fileSystem, partPath)
+
// Open tag family files
entries := fileSystem.ReadDir(partPath)
p.tagFamilies = make(map[string]fs.Reader)
@@ -559,6 +573,9 @@ func closeStreamPart(p *streamPart) {
if p.timestamps != nil {
fs.MustClose(p.timestamps)
}
+ if p.seriesMetadata != nil {
+ fs.MustClose(p.seriesMetadata)
+ }
for _, r := range p.tagFamilies {
fs.MustClose(r)
}
@@ -921,18 +938,38 @@ func discoverStreamTagColumns(partIDs []uint64, shardPath
string, fileSystem fs.
return result, nil
}
-func writeStreamRowAsText(row streamRowData, rowNum int, verbose bool,
projectionTags []string, seriesMap map[common.SeriesID]string) {
+func writeStreamRowAsText(
+ row streamRowData,
+ rowNum int,
+ verbose bool,
+ projectionTags []string,
+ seriesMap map[common.SeriesID]string,
+ partSeriesMap map[uint64]map[common.SeriesID]string,
+) {
fmt.Printf("Row %d:\n", rowNum)
fmt.Printf(" PartID: %d (0x%016x)\n", row.partID, row.partID)
fmt.Printf(" ElementID: %d\n", row.elementID)
fmt.Printf(" Timestamp: %s\n", formatTimestamp(row.timestamp))
fmt.Printf(" SeriesID: %d\n", row.seriesID)
- if seriesMap != nil {
- if seriesText, ok := seriesMap[row.seriesID]; ok {
- fmt.Printf(" Series: %s\n", seriesText)
+ seriesText := ""
+ // First try to get EntityValues from smeta.bin (part-level)
+ if partSeriesMap != nil {
+ if partMap, ok := partSeriesMap[row.partID]; ok {
+ if text, ok := partMap[row.seriesID]; ok {
+ seriesText = text
+ }
}
}
+ // Fallback to segment-level seriesMap if not found in smeta
+ if seriesText == "" && seriesMap != nil {
+ if text, ok := seriesMap[row.seriesID]; ok {
+ seriesText = text
+ }
+ }
+ if seriesText != "" {
+ fmt.Printf(" Series: %s\n", seriesText)
+ }
fmt.Printf(" Element Data: %d bytes\n", len(row.elementData))
if verbose && len(row.elementData) > 0 {
@@ -968,9 +1005,24 @@ func writeStreamRowAsText(row streamRowData, rowNum int,
verbose bool, projectio
fmt.Printf("\n")
}
-func writeStreamRowAsCSV(writer *csv.Writer, row streamRowData, tagColumns
[]string, seriesMap map[common.SeriesID]string) error {
+func writeStreamRowAsCSV(
+ writer *csv.Writer,
+ row streamRowData,
+ tagColumns []string,
+ seriesMap map[common.SeriesID]string,
+ partSeriesMap map[uint64]map[common.SeriesID]string,
+) error {
seriesText := ""
- if seriesMap != nil {
+ // First try to get EntityValues from smeta.bin (part-level)
+ if partSeriesMap != nil {
+ if partMap, ok := partSeriesMap[row.partID]; ok {
+ if text, ok := partMap[row.seriesID]; ok {
+ seriesText = text
+ }
+ }
+ }
+ // Fallback to segment-level seriesMap if not found in smeta
+ if seriesText == "" && seriesMap != nil {
if text, ok := seriesMap[row.seriesID]; ok {
seriesText = text
}
@@ -1097,3 +1149,7 @@ func streamTagValueDecoder(valueType pbv1.ValueType,
value []byte, valueArr [][]
return pbv1.NullTagValue
}
}
+
+func (ctx *streamDumpContext) parseAndDisplaySeriesMetadata(partID uint64, p
*streamPart) error {
+ return dump.ParseSeriesMetadata(partID, p.seriesMetadata,
ctx.partSeriesMap)
+}
diff --git a/banyand/cmd/dump/stream_test.go b/banyand/cmd/dump/stream_test.go
index ce975153..26f4bf44 100644
--- a/banyand/cmd/dump/stream_test.go
+++ b/banyand/cmd/dump/stream_test.go
@@ -18,6 +18,7 @@
package main
import (
+ "fmt"
"path/filepath"
"strconv"
"testing"
@@ -25,10 +26,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/test"
)
@@ -148,3 +153,92 @@ func TestDumpStreamPartFormat(t *testing.T) {
func createTestStreamPartForDump(tmpPath string, fileSystem fs.FileSystem)
(string, func()) {
return stream.CreateTestPartForDump(tmpPath, fileSystem)
}
+
+// TestDumpStreamPartWithSeriesMetadata tests that the dump tool can parse
smeta file.
+// This test creates a part with series metadata and verifies the dump tool
can correctly parse it.
+func TestDumpStreamPartWithSeriesMetadata(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Create a part with series metadata
+ partPath, cleanup := createTestStreamPartWithSeriesMetadata(tmpPath,
fileSystem)
+ defer cleanup()
+
+ // Extract part ID from path
+ partName := filepath.Base(partPath)
+ partID, err := strconv.ParseUint(partName, 16, 64)
+ require.NoError(t, err, "part directory should have valid hex name")
+
+ // Parse the part using dump tool functions
+ p, err := openStreamPart(partID, filepath.Dir(partPath), fileSystem)
+ require.NoError(t, err, "should be able to open part with series
metadata")
+ defer closeStreamPart(p)
+
+ // Verify series metadata reader is available
+ assert.NotNil(t, p.seriesMetadata, "series metadata reader should be
available")
+
+ // Create a dump context to test parsing
+ opts := streamDumpOptions{
+ shardPath: filepath.Dir(partPath),
+ segmentPath: tmpPath, // Not used for this test
+ verbose: false,
+ csvOutput: false,
+ }
+ ctx, err := newStreamDumpContext(opts)
+ require.NoError(t, err)
+ if ctx != nil {
+ defer ctx.close()
+ }
+
+ // Test parsing series metadata
+ err = ctx.parseAndDisplaySeriesMetadata(partID, p)
+ require.NoError(t, err, "should be able to parse series metadata")
+
+ // Verify EntityValues are stored in partSeriesMap
+ require.NotNil(t, ctx.partSeriesMap, "partSeriesMap should be
initialized")
+ partMap, exists := ctx.partSeriesMap[partID]
+ require.True(t, exists, "partSeriesMap should contain entry for partID")
+ require.NotNil(t, partMap, "partMap should not be nil")
+
+ // Verify EntityValues are correctly stored
+ // Calculate expected SeriesIDs from EntityValues
+ expectedSeriesID1 :=
common.SeriesID(convert.Hash([]byte("test=entity1")))
+ expectedSeriesID2 :=
common.SeriesID(convert.Hash([]byte("test=entity2")))
+
+ assert.Contains(t, partMap, expectedSeriesID1, "partMap should contain
first series")
+ assert.Contains(t, partMap, expectedSeriesID2, "partMap should contain
second series")
+ assert.Equal(t, "test=entity1", partMap[expectedSeriesID1],
"EntityValues should match")
+ assert.Equal(t, "test=entity2", partMap[expectedSeriesID2],
"EntityValues should match")
+}
+
+// createTestStreamPartWithSeriesMetadata creates a test stream part with
series metadata.
+func createTestStreamPartWithSeriesMetadata(tmpPath string, fileSystem
fs.FileSystem) (string, func()) {
+ // Use stream package to create a part
+ partPath, cleanup := stream.CreateTestPartForDump(tmpPath, fileSystem)
+
+ // Create sample series metadata file
+ // This is a simplified version - in real scenarios, smeta is created
during flush
+ seriesMetadataPath := filepath.Join(partPath, "smeta.bin")
+
+ // Create sample documents
+ docs := index.Documents{
+ {
+ DocID: 1,
+ EntityValues: []byte("test=entity1"),
+ },
+ {
+ DocID: 2,
+ EntityValues: []byte("test=entity2"),
+ },
+ }
+
+ seriesMetadataBytes, err := docs.Marshal()
+ if err != nil {
+ panic(fmt.Sprintf("failed to marshal series metadata documents:
%v", err))
+ }
+ fs.MustFlush(fileSystem, seriesMetadataBytes, seriesMetadataPath,
storage.FilePerm)
+
+ return partPath, cleanup
+}
diff --git a/banyand/cmd/dump/trace.go b/banyand/cmd/dump/trace.go
index 7d625c8f..7dac5219 100644
--- a/banyand/cmd/dump/trace.go
+++ b/banyand/cmd/dump/trace.go
@@ -40,6 +40,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/dump"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
@@ -283,6 +284,7 @@ type part struct {
tagMetadata map[string]fs.Reader
tags map[string]fs.Reader
tagType map[string]pbv1.ValueType
+ seriesMetadata fs.Reader // Optional: series metadata reader
path string
primaryBlockMetadata []primaryBlockMetadata
partMetadata partMetadata
@@ -301,6 +303,7 @@ type traceDumpContext struct {
tagFilter logical.TagFilter
fileSystem fs.FileSystem
seriesMap map[common.SeriesID]string
+ partSeriesMap map[uint64]map[common.SeriesID]string // partID ->
SeriesID -> EntityValues from smeta.bin
writer *csv.Writer
opts traceDumpOptions
partIDs []uint64
@@ -311,8 +314,9 @@ type traceDumpContext struct {
func newTraceDumpContext(opts traceDumpOptions) (*traceDumpContext, error) {
ctx := &traceDumpContext{
- opts: opts,
- fileSystem: fs.NewLocalFileSystem(),
+ opts: opts,
+ fileSystem: fs.NewLocalFileSystem(),
+ partSeriesMap: make(map[uint64]map[common.SeriesID]string),
}
partIDs, err := discoverTracePartIDs(opts.shardPath)
@@ -414,6 +418,13 @@ func (ctx *traceDumpContext) processParts() error {
}
func (ctx *traceDumpContext) processPart(partID uint64, p *part) (int, error) {
+ // Parse and display series metadata if available
+ if p.seriesMetadata != nil {
+ if err := ctx.parseAndDisplaySeriesMetadata(partID, p); err !=
nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error parsing series
metadata in part %016x: %v\n", partID, err)
+ }
+ }
+
decoder := &encoding.BytesBlockDecoder{}
partRowCount := 0
@@ -507,11 +518,11 @@ func (ctx *traceDumpContext) shouldSkip(tags
map[string][]byte, tagTypes map[str
func (ctx *traceDumpContext) writeRow(row traceRowData) error {
if ctx.opts.csvOutput {
- if err := writeTraceRowAsCSV(ctx.writer, row, ctx.tagColumns,
ctx.seriesMap); err != nil {
+ if err := writeTraceRowAsCSV(ctx.writer, row, ctx.tagColumns,
ctx.seriesMap, ctx.partSeriesMap); err != nil {
return err
}
} else {
- writeTraceRowAsText(row, ctx.rowNum+1, ctx.opts.verbose,
ctx.projectionTags, ctx.seriesMap)
+ writeTraceRowAsText(row, ctx.rowNum+1, ctx.opts.verbose,
ctx.projectionTags, ctx.seriesMap, ctx.partSeriesMap)
}
ctx.rowNum++
return nil
@@ -575,6 +586,9 @@ func openFilePart(id uint64, root string, fileSystem
fs.FileSystem) (*part, erro
return nil, fmt.Errorf("cannot open spans.bin: %w", err)
}
+ // Try to open series metadata file (optional, for backward
compatibility)
+ p.seriesMetadata = dump.TryOpenSeriesMetadata(fileSystem, partPath)
+
// Open tag files
entries := fileSystem.ReadDir(partPath)
p.tags = make(map[string]fs.Reader)
@@ -609,6 +623,9 @@ func closePart(p *part) {
if p.spans != nil {
fs.MustClose(p.spans)
}
+ if p.seriesMetadata != nil {
+ fs.MustClose(p.seriesMetadata)
+ }
for _, r := range p.tags {
fs.MustClose(r)
}
@@ -1213,19 +1230,38 @@ func discoverTagColumns(partIDs []uint64, shardPath
string, fileSystem fs.FileSy
return tagNames, nil
}
-func writeTraceRowAsText(row traceRowData, rowNum int, verbose bool,
projectionTags []string, seriesMap map[common.SeriesID]string) {
+func writeTraceRowAsText(
+ row traceRowData,
+ rowNum int,
+ verbose bool,
+ projectionTags []string,
+ seriesMap map[common.SeriesID]string,
+ partSeriesMap map[uint64]map[common.SeriesID]string,
+) {
fmt.Printf("Row %d:\n", rowNum)
fmt.Printf(" PartID: %d (0x%016x)\n", row.partID, row.partID)
fmt.Printf(" TraceID: %s\n", row.traceID)
fmt.Printf(" SpanID: %s\n", row.spanID)
fmt.Printf(" SeriesID: %d\n", row.seriesID)
- // Add series text if available
- if seriesMap != nil {
- if seriesText, ok := seriesMap[row.seriesID]; ok {
- fmt.Printf(" Series: %s\n", seriesText)
+ seriesText := ""
+ // First try to get EntityValues from smeta.bin (part-level)
+ if partSeriesMap != nil {
+ if partMap, ok := partSeriesMap[row.partID]; ok {
+ if text, ok := partMap[row.seriesID]; ok {
+ seriesText = text
+ }
}
}
+ // Fallback to segment-level seriesMap if not found in smeta
+ if seriesText == "" && seriesMap != nil {
+ if text, ok := seriesMap[row.seriesID]; ok {
+ seriesText = text
+ }
+ }
+ if seriesText != "" {
+ fmt.Printf(" Series: %s\n", seriesText)
+ }
fmt.Printf(" Span Data: %d bytes\n", len(row.spanData))
if verbose {
@@ -1269,9 +1305,24 @@ func writeTraceRowAsText(row traceRowData, rowNum int,
verbose bool, projectionT
fmt.Printf("\n")
}
-func writeTraceRowAsCSV(writer *csv.Writer, row traceRowData, tagColumns
[]string, seriesMap map[common.SeriesID]string) error {
+func writeTraceRowAsCSV(
+ writer *csv.Writer,
+ row traceRowData,
+ tagColumns []string,
+ seriesMap map[common.SeriesID]string,
+ partSeriesMap map[uint64]map[common.SeriesID]string,
+) error {
seriesText := ""
- if seriesMap != nil {
+ // First try to get EntityValues from smeta.bin (part-level)
+ if partSeriesMap != nil {
+ if partMap, ok := partSeriesMap[row.partID]; ok {
+ if text, ok := partMap[row.seriesID]; ok {
+ seriesText = text
+ }
+ }
+ }
+ // Fallback to segment-level seriesMap if not found in smeta
+ if seriesText == "" && seriesMap != nil {
if text, ok := seriesMap[row.seriesID]; ok {
seriesText = text
}
@@ -1297,3 +1348,7 @@ func writeTraceRowAsCSV(writer *csv.Writer, row
traceRowData, tagColumns []strin
return writer.Write(csvRow)
}
+
+func (ctx *traceDumpContext) parseAndDisplaySeriesMetadata(partID uint64, p
*part) error {
+ return dump.ParseSeriesMetadata(partID, p.seriesMetadata,
ctx.partSeriesMap)
+}
diff --git a/banyand/cmd/dump/trace_test.go b/banyand/cmd/dump/trace_test.go
index c07a4773..9191094d 100644
--- a/banyand/cmd/dump/trace_test.go
+++ b/banyand/cmd/dump/trace_test.go
@@ -18,6 +18,7 @@
package main
import (
+ "fmt"
"path/filepath"
"strconv"
"testing"
@@ -25,11 +26,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/trace"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/test"
)
@@ -187,3 +191,91 @@ func decodeStringArray(data []byte) []string {
}
return values
}
+
+// TestDumpTracePartWithSeriesMetadata tests that the dump tool can parse
smeta file.
+// This test creates a part with series metadata and verifies the dump tool
can correctly parse it.
+func TestDumpTracePartWithSeriesMetadata(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Create a part with series metadata
+ partPath, cleanup := createTestTracePartWithSeriesMetadata(tmpPath,
fileSystem)
+ defer cleanup()
+
+ // Extract part ID from path
+ partName := filepath.Base(partPath)
+ partID, err := strconv.ParseUint(partName, 16, 64)
+ require.NoError(t, err, "part directory should have valid hex name")
+
+ // Parse the part using dump tool functions
+ p, err := openFilePart(partID, filepath.Dir(partPath), fileSystem)
+ require.NoError(t, err, "should be able to open part with series
metadata")
+ defer closePart(p)
+
+ // Verify series metadata reader is available
+ assert.NotNil(t, p.seriesMetadata, "series metadata reader should be
available")
+
+ // Create a dump context to test parsing
+ opts := traceDumpOptions{
+ shardPath: filepath.Dir(partPath),
+ segmentPath: tmpPath, // Not used for this test
+ verbose: false,
+ csvOutput: false,
+ }
+ ctx, err := newTraceDumpContext(opts)
+ require.NoError(t, err)
+ if ctx != nil {
+ defer ctx.close()
+ }
+
+ // Test parsing series metadata
+ err = ctx.parseAndDisplaySeriesMetadata(partID, p)
+ require.NoError(t, err, "should be able to parse series metadata")
+
+ // Verify EntityValues are stored in partSeriesMap
+ require.NotNil(t, ctx.partSeriesMap, "partSeriesMap should be
initialized")
+ partMap, exists := ctx.partSeriesMap[partID]
+ require.True(t, exists, "partSeriesMap should contain entry for partID")
+ require.NotNil(t, partMap, "partMap should not be nil")
+
+ // Verify EntityValues are correctly stored
+ // Calculate expected SeriesIDs from EntityValues
+ expectedSeriesID1 :=
common.SeriesID(convert.Hash([]byte("service.name=test-service")))
+ expectedSeriesID2 :=
common.SeriesID(convert.Hash([]byte("service.name=another-service")))
+
+ assert.Contains(t, partMap, expectedSeriesID1, "partMap should contain
first series")
+ assert.Contains(t, partMap, expectedSeriesID2, "partMap should contain
second series")
+ assert.Equal(t, "service.name=test-service",
partMap[expectedSeriesID1], "EntityValues should match")
+ assert.Equal(t, "service.name=another-service",
partMap[expectedSeriesID2], "EntityValues should match")
+}
+
+// createTestTracePartWithSeriesMetadata creates a test trace part with series
metadata.
+func createTestTracePartWithSeriesMetadata(tmpPath string, fileSystem
fs.FileSystem) (string, func()) {
+ // Use trace package to create a part
+ partPath, cleanup := trace.CreateTestPartForDump(tmpPath, fileSystem)
+
+ // Create sample series metadata file
+ seriesMetadataPath := filepath.Join(partPath, "smeta.bin")
+
+ // Create sample documents
+ docs := index.Documents{
+ {
+ DocID: 1,
+ EntityValues: []byte("service.name=test-service"),
+ },
+ {
+ DocID: 2,
+ EntityValues: []byte("service.name=another-service"),
+ },
+ }
+
+ seriesMetadataBytes, err := docs.Marshal()
+ if err != nil {
+ panic(fmt.Sprintf("failed to marshal series metadata documents:
%v", err))
+ }
+ fs.MustFlush(fileSystem, seriesMetadataBytes, seriesMetadataPath,
storage.FilePerm)
+
+ return partPath, cleanup
+}
diff --git a/pkg/dump/dump.go b/pkg/dump/dump.go
new file mode 100644
index 00000000..49b80c0f
--- /dev/null
+++ b/pkg/dump/dump.go
@@ -0,0 +1,88 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package dump provides utilities for dumping BanyanDB data.
+package dump
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index"
+)
+
+// TryOpenSeriesMetadata attempts to open the series metadata file (smeta.bin)
in the given part path.
+// It returns the reader if successful, or nil if the file doesn't exist or on
error.
+// Only file not found errors are silently ignored; other errors are reported
as warnings.
+func TryOpenSeriesMetadata(fileSystem fs.FileSystem, partPath string)
fs.Reader {
+ seriesMetadataPath := filepath.Join(partPath, "smeta.bin")
+ reader, err := fileSystem.OpenFile(seriesMetadataPath)
+ if err != nil {
+ // Only ignore file not found errors; other errors should be
reported
+ var fsErr *fs.FileSystemError
+ if !errors.As(err, &fsErr) || fsErr.Code != fs.IsNotExistError {
+ fmt.Fprintf(os.Stderr, "Warning: Failed to open series
metadata file %s: %v\n", seriesMetadataPath, err)
+ }
+ // File doesn't exist, it's okay - just continue without it
+ return nil
+ }
+ return reader
+}
+
+// ParseSeriesMetadata parses series metadata from the given reader and stores
EntityValues in partSeriesMap.
+// It reads all data from the series metadata file, unmarshals Documents, and
stores the mapping
+// of SeriesID to EntityValues for use in CSV output.
+func ParseSeriesMetadata(partID uint64, seriesMetadata fs.Reader,
partSeriesMap map[uint64]map[common.SeriesID]string) error {
+ // Read all data from series metadata file
+ seqReader := seriesMetadata.SequentialRead()
+ defer seqReader.Close()
+
+ readMetadataBytes, err := io.ReadAll(seqReader)
+ if err != nil {
+ return fmt.Errorf("failed to read series metadata: %w", err)
+ }
+
+ if len(readMetadataBytes) == 0 {
+ return nil
+ }
+
+ // Unmarshal Documents
+ var docs index.Documents
+ if err := docs.Unmarshal(readMetadataBytes); err != nil {
+ return fmt.Errorf("failed to unmarshal series metadata: %w",
err)
+ }
+
+ if len(docs) == 0 {
+ return nil // No documents
+ }
+
+ // Store EntityValues in partSeriesMap for use in CSV output
+ partMap := make(map[common.SeriesID]string)
+ for _, doc := range docs {
+ seriesID := common.SeriesID(convert.Hash(doc.EntityValues))
+ partMap[seriesID] = string(doc.EntityValues)
+ }
+ partSeriesMap[partID] = partMap
+
+ return nil
+}