This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new caef8d41 Add command-line tool for dumping BanyanDB trace data (#835)
caef8d41 is described below
commit caef8d4178339d2d088b5800d9e9a875b5b6d08d
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Nov 5 16:38:41 2025 +0800
Add command-line tool for dumping BanyanDB trace data (#835)
This commit introduces a new command-line tool for dumping and inspecting
BanyanDB trace data. The tool includes a 'trace' subcommand that allows users
to display trace data in both human-readable and CSV formats.
---
CHANGES.md | 1 +
banyand/cmd/dump/main.go | 45 +++
banyand/cmd/dump/trace.go | 879 +++++++++++++++++++++++++++++++++++++++++
banyand/cmd/dump/trace_test.go | 189 +++++++++
banyand/trace/test_helper.go | 81 ++++
5 files changed, 1195 insertions(+)
diff --git a/CHANGES.md b/CHANGES.md
index 12224018..1817a601 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -55,6 +55,7 @@ Release Notes.
- UI: Implement the Query Page for BydbQL.
- Refactor router for better usability.
- Implement the handoff queue for Trace.
+- Add dump command-line tool to parse and display trace part data with support
for CSV export and human-readable timestamp formatting.
### Bug Fixes
diff --git a/banyand/cmd/dump/main.go b/banyand/cmd/dump/main.go
new file mode 100644
index 00000000..5bb17eb5
--- /dev/null
+++ b/banyand/cmd/dump/main.go
@@ -0,0 +1,45 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package main provides a command-line tool to dump BanyanDB data.
+package main
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/spf13/cobra"
+
+ "github.com/apache/skywalking-banyandb/pkg/version"
+)
+
+func main() {
+ rootCmd := &cobra.Command{
+ Use: "dump",
+ Short: "Dump BanyanDB data from storage files",
+ Version: version.Build(),
+ Long: `dump is a command-line tool for dumping and inspecting
BanyanDB storage data.
+It provides subcommands for different data types (trace, stream, measure,
etc.).`,
+ }
+
+ rootCmd.AddCommand(newTraceCmd())
+
+ if err := rootCmd.Execute(); err != nil {
+ fmt.Fprintln(os.Stderr, err)
+ os.Exit(1)
+ }
+}
diff --git a/banyand/cmd/dump/trace.go b/banyand/cmd/dump/trace.go
new file mode 100644
index 00000000..d779a658
--- /dev/null
+++ b/banyand/cmd/dump/trace.go
@@ -0,0 +1,879 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "encoding/csv"
+ "encoding/json"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/spf13/cobra"
+
+ internalencoding
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/bytes"
+ "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+func newTraceCmd() *cobra.Command {
+ var partPath string
+ var verbose bool
+ var csvOutput bool
+
+ cmd := &cobra.Command{
+ Use: "trace",
+ Short: "Dump trace part data",
+ Long: `Dump and display contents of a trace part directory.
+Outputs trace data in human-readable format or CSV.`,
+ Example: ` # Display trace data in text format
+ dump trace -path /path/to/part/0000000000004db4
+
+ # Display with verbose hex dumps
+ dump trace -path /path/to/part/0000000000004db4 -v
+
+ # Output as CSV
+ dump trace -path /path/to/part/0000000000004db4 -csv
+
+ # Save CSV to file
+ dump trace -path /path/to/part/0000000000004db4 -csv > output.csv`,
+ RunE: func(_ *cobra.Command, _ []string) error {
+ if partPath == "" {
+ return fmt.Errorf("-path flag is required")
+ }
+ return dumpTracePart(partPath, verbose, csvOutput)
+ },
+ }
+
+ cmd.Flags().StringVarP(&partPath, "path", "p", "", "Path to the trace
part directory (required)")
+ cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output
(show raw data)")
+ cmd.Flags().BoolVarP(&csvOutput, "csv", "c", false, "Output as CSV
format")
+ _ = cmd.MarkFlagRequired("path")
+
+ return cmd
+}
+
+func dumpTracePart(partPath string, verbose bool, csvOutput bool) error {
+ // Get the part ID from the directory name
+ partName := filepath.Base(partPath)
+ partID, err := strconv.ParseUint(partName, 16, 64)
+ if err != nil {
+ return fmt.Errorf("invalid part name %q: %w", partName, err)
+ }
+
+ // Get the root path (parent directory)
+ rootPath := filepath.Dir(partPath)
+
+ // Open the file system
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Open the part
+ p, err := openFilePart(partID, rootPath, fileSystem)
+ if err != nil {
+ return fmt.Errorf("failed to open part: %w", err)
+ }
+ defer closePart(p)
+
+ if csvOutput {
+ return dumpPartAsCSV(p)
+ }
+
+ // Original text output
+ fmt.Printf("Opening trace part: %s (ID: %d)\n", partPath, partID)
+
fmt.Printf("================================================================================\n\n")
+
+ // Print part metadata
+ fmt.Printf("Part Metadata:\n")
+ fmt.Printf(" ID: %d (0x%016x)\n", p.partMetadata.ID, p.partMetadata.ID)
+ fmt.Printf(" Total Count: %d\n", p.partMetadata.TotalCount)
+ fmt.Printf(" Blocks Count: %d\n", p.partMetadata.BlocksCount)
+ fmt.Printf(" Min Timestamp: %s (%d)\n",
formatTimestamp(p.partMetadata.MinTimestamp), p.partMetadata.MinTimestamp)
+ fmt.Printf(" Max Timestamp: %s (%d)\n",
formatTimestamp(p.partMetadata.MaxTimestamp), p.partMetadata.MaxTimestamp)
+ fmt.Printf(" Compressed Size: %d bytes\n",
p.partMetadata.CompressedSizeBytes)
+ fmt.Printf(" Uncompressed Span Size: %d bytes\n",
p.partMetadata.UncompressedSpanSizeBytes)
+ fmt.Printf("\n")
+
+ // Print tag types
+ if len(p.tagType) > 0 {
+ fmt.Printf("Tag Types:\n")
+ tagNames := make([]string, 0, len(p.tagType))
+ for name := range p.tagType {
+ tagNames = append(tagNames, name)
+ }
+ sort.Strings(tagNames)
+ for _, name := range tagNames {
+ fmt.Printf(" %s: %s\n", name,
valueTypeName(p.tagType[name]))
+ }
+ fmt.Printf("\n")
+ }
+
+ // Print primary block metadata
+ fmt.Printf("Primary Block Metadata (Total: %d blocks):\n",
len(p.primaryBlockMetadata))
+
fmt.Printf("--------------------------------------------------------------------------------\n")
+ for i, pbm := range p.primaryBlockMetadata {
+ fmt.Printf("Block %d:\n", i)
+ fmt.Printf(" TraceID: %s\n", pbm.traceID)
+ fmt.Printf(" Offset: %d\n", pbm.offset)
+ fmt.Printf(" Size: %d bytes\n", pbm.size)
+
+ // Read and decompress the primary data for this block
+ if verbose {
+ primaryData := make([]byte, pbm.size)
+ fs.MustReadData(p.primary, int64(pbm.offset),
primaryData)
+ decompressed, err := zstd.Decompress(nil, primaryData)
+ if err == nil {
+ fmt.Printf(" Primary Data (decompressed %d
bytes):\n", len(decompressed))
+ printHexDump(decompressed, 4)
+ }
+ }
+ }
+ fmt.Printf("\n")
+
+ // Read and display trace data
+ fmt.Printf("Trace Data:\n")
+
fmt.Printf("================================================================================\n\n")
+
+ decoder := &encoding.BytesBlockDecoder{}
+
+ rowNum := 0
+ for _, pbm := range p.primaryBlockMetadata {
+ // Read primary data block
+ primaryData := make([]byte, pbm.size)
+ fs.MustReadData(p.primary, int64(pbm.offset), primaryData)
+
+ // Decompress
+ decompressed, err := zstd.Decompress(nil, primaryData)
+ if err != nil {
+ fmt.Printf("Error decompressing primary data: %v\n",
err)
+ continue
+ }
+
+ // Parse ALL block metadata entries from this primary block
+ blockMetadatas, err := parseAllBlockMetadata(decompressed,
p.tagType)
+ if err != nil {
+ fmt.Printf("Error parsing block metadata: %v\n", err)
+ continue
+ }
+
+ // Process each trace block within this primary block
+ for _, bm := range blockMetadatas {
+ // Read spans
+ spans, spanIDs, err := readSpans(decoder, bm.spans,
int(bm.count), p.spans)
+ if err != nil {
+ fmt.Printf("Error reading spans for trace %s:
%v\n", bm.traceID, err)
+ continue
+ }
+
+ // Read tags
+ tags := make(map[string][][]byte)
+ for tagName, tagBlock := range bm.tags {
+ tagValues, err := readTagValues(decoder,
tagBlock, tagName, int(bm.count), p.tagMetadata[tagName], p.tags[tagName],
p.tagType[tagName])
+ if err != nil {
+ fmt.Printf("Error reading tag %s for
trace %s: %v\n", tagName, bm.traceID, err)
+ continue
+ }
+ tags[tagName] = tagValues
+ }
+
+ // Display each span as a row
+ for i := 0; i < len(spans); i++ {
+ rowNum++
+ fmt.Printf("Row %d:\n", rowNum)
+ fmt.Printf(" TraceID: %s\n", bm.traceID)
+ fmt.Printf(" SpanID: %s\n", spanIDs[i])
+ fmt.Printf(" Span Data: %d bytes\n",
len(spans[i]))
+ if verbose {
+ fmt.Printf(" Span Content:\n")
+ printHexDump(spans[i], 4)
+ } else {
+ // Try to print as string if it's
printable
+ if isPrintable(spans[i]) {
+ fmt.Printf(" Span: %s\n",
string(spans[i]))
+ } else {
+ fmt.Printf(" Span: (binary
data, %d bytes)\n", len(spans[i]))
+ }
+ }
+
+ // Print tags for this span
+ if len(tags) > 0 {
+ fmt.Printf(" Tags:\n")
+ tagNames := make([]string, 0, len(tags))
+ for name := range tags {
+ tagNames = append(tagNames,
name)
+ }
+ sort.Strings(tagNames)
+ for _, name := range tagNames {
+ if i < len(tags[name]) {
+ tagValue :=
tags[name][i]
+ if tagValue == nil {
+ fmt.Printf("
%s: <nil>\n", name)
+ } else {
+ valueType :=
p.tagType[name]
+ fmt.Printf("
%s (%s): %s\n", name, valueTypeName(valueType),
formatTagValueForDisplay(tagValue, valueType))
+ }
+ }
+ }
+ }
+ fmt.Printf("\n")
+ }
+ }
+ }
+
+ fmt.Printf("Total rows: %d\n", rowNum)
+ return nil
+}
+
+func dumpPartAsCSV(p *part) error {
+ decoder := &encoding.BytesBlockDecoder{}
+
+ // Collect all tag names in sorted order
+ allTagNames := make([]string, 0, len(p.tagType))
+ for name := range p.tagType {
+ allTagNames = append(allTagNames, name)
+ }
+ sort.Strings(allTagNames)
+
+ // Create CSV writer
+ writer := csv.NewWriter(os.Stdout)
+ defer writer.Flush()
+
+ // Write header
+ header := []string{"TraceID", "SpanID", "SpanDataSize"}
+ header = append(header, allTagNames...)
+ if err := writer.Write(header); err != nil {
+ return fmt.Errorf("failed to write CSV header: %w", err)
+ }
+
+ // Process all blocks and write rows
+ for _, pbm := range p.primaryBlockMetadata {
+ // Read primary data block
+ primaryData := make([]byte, pbm.size)
+ fs.MustReadData(p.primary, int64(pbm.offset), primaryData)
+
+ // Decompress
+ decompressed, err := zstd.Decompress(nil, primaryData)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Error decompressing primary
data: %v\n", err)
+ continue
+ }
+
+ // Parse ALL block metadata entries from this primary block
+ blockMetadatas, err := parseAllBlockMetadata(decompressed,
p.tagType)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Error parsing block metadata:
%v\n", err)
+ continue
+ }
+
+ // Process each block
+ for _, bm := range blockMetadatas {
+ // Read spans
+ spans, spanIDs, err := readSpans(decoder, bm.spans,
int(bm.count), p.spans)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Error reading spans for
trace %s: %v\n", bm.traceID, err)
+ continue
+ }
+
+ // Read tags
+ tags := make(map[string][][]byte)
+ for tagName, tagBlock := range bm.tags {
+ tagValues, err := readTagValues(decoder,
tagBlock, tagName, int(bm.count), p.tagMetadata[tagName], p.tags[tagName],
p.tagType[tagName])
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Error reading
tag %s for trace %s: %v\n", tagName, bm.traceID, err)
+ continue
+ }
+ tags[tagName] = tagValues
+ }
+
+ // Write each span as a CSV row
+ for i := 0; i < len(spans); i++ {
+ row := append(make([]string, 0, len(header)),
bm.traceID, spanIDs[i], strconv.Itoa(len(spans[i])))
+
+ // Add tag values in the same order as header
+ for _, tagName := range allTagNames {
+ var value string
+ if i < len(tags[tagName]) &&
tags[tagName][i] != nil {
+ valueType := p.tagType[tagName]
+ value =
formatTagValueForCSV(tags[tagName][i], valueType)
+ }
+ row = append(row, value)
+ }
+
+ if err := writer.Write(row); err != nil {
+ return fmt.Errorf("failed to write CSV
row: %w", err)
+ }
+ }
+ }
+ }
+
+ return nil
+}
+
+func valueTypeName(vt pbv1.ValueType) string {
+ switch vt {
+ case pbv1.ValueTypeStr:
+ return "STRING"
+ case pbv1.ValueTypeInt64:
+ return "INT64"
+ case pbv1.ValueTypeFloat64:
+ return "FLOAT64"
+ case pbv1.ValueTypeStrArr:
+ return "STRING_ARRAY"
+ case pbv1.ValueTypeInt64Arr:
+ return "INT64_ARRAY"
+ case pbv1.ValueTypeBinaryData:
+ return "BINARY_DATA"
+ case pbv1.ValueTypeTimestamp:
+ return "TIMESTAMP"
+ case pbv1.ValueTypeUnknown:
+ return "UNKNOWN"
+ default:
+ return fmt.Sprintf("UNKNOWN(%d)", vt)
+ }
+}
+
+func formatTimestamp(nanos int64) string {
+ if nanos == 0 {
+ return "N/A"
+ }
+ // Convert nanoseconds to time.Time
+ t := time.Unix(0, nanos)
+ return t.Format(time.RFC3339Nano)
+}
+
+func formatTagValueForDisplay(data []byte, vt pbv1.ValueType) string {
+ if data == nil {
+ return "<nil>"
+ }
+ switch vt {
+ case pbv1.ValueTypeStr:
+ return fmt.Sprintf("%q", string(data))
+ case pbv1.ValueTypeInt64:
+ if len(data) >= 8 {
+ return fmt.Sprintf("%d", convert.BytesToInt64(data))
+ }
+ return fmt.Sprintf("(invalid int64 data: %d bytes)", len(data))
+ case pbv1.ValueTypeFloat64:
+ if len(data) >= 8 {
+ return fmt.Sprintf("%f", convert.BytesToFloat64(data))
+ }
+ return fmt.Sprintf("(invalid float64 data: %d bytes)",
len(data))
+ case pbv1.ValueTypeTimestamp:
+ if len(data) >= 8 {
+ nanos := convert.BytesToInt64(data)
+ return formatTimestamp(nanos)
+ }
+ return fmt.Sprintf("(invalid timestamp data: %d bytes)",
len(data))
+ case pbv1.ValueTypeBinaryData:
+ if isPrintable(data) {
+ return fmt.Sprintf("%q", string(data))
+ }
+ return fmt.Sprintf("(binary: %d bytes)", len(data))
+ default:
+ if isPrintable(data) {
+ return fmt.Sprintf("%q", string(data))
+ }
+ return fmt.Sprintf("(binary: %d bytes)", len(data))
+ }
+}
+
+func formatTagValueForCSV(data []byte, vt pbv1.ValueType) string {
+ if data == nil {
+ return ""
+ }
+ switch vt {
+ case pbv1.ValueTypeStr:
+ return string(data)
+ case pbv1.ValueTypeInt64:
+ if len(data) >= 8 {
+ return strconv.FormatInt(convert.BytesToInt64(data), 10)
+ }
+ return ""
+ case pbv1.ValueTypeFloat64:
+ if len(data) >= 8 {
+ return
strconv.FormatFloat(convert.BytesToFloat64(data), 'f', -1, 64)
+ }
+ return ""
+ case pbv1.ValueTypeTimestamp:
+ if len(data) >= 8 {
+ nanos := convert.BytesToInt64(data)
+ return formatTimestamp(nanos)
+ }
+ return ""
+ case pbv1.ValueTypeStrArr:
+ // Decode string array - each element is separated by
EntityDelimiter
+ var values []string
+ var err error
+ remaining := data
+ for len(remaining) > 0 {
+ var decoded []byte
+ decoded, remaining, err = unmarshalVarArray(nil,
remaining)
+ if err != nil {
+ break
+ }
+ if len(decoded) > 0 {
+ values = append(values, string(decoded))
+ }
+ }
+ if len(values) > 0 {
+ return strings.Join(values, ";")
+ }
+ return ""
+ case pbv1.ValueTypeBinaryData:
+ if isPrintable(data) {
+ return string(data)
+ }
+ return fmt.Sprintf("(binary: %d bytes)", len(data))
+ default:
+ if isPrintable(data) {
+ return string(data)
+ }
+ return fmt.Sprintf("(binary: %d bytes)", len(data))
+ }
+}
+
+func unmarshalVarArray(dest, src []byte) ([]byte, []byte, error) {
+ if len(src) == 0 {
+ return nil, nil, fmt.Errorf("empty entity value")
+ }
+ if src[0] == internalencoding.EntityDelimiter {
+ return dest, src[1:], nil
+ }
+ for len(src) > 0 {
+ switch {
+ case src[0] == internalencoding.Escape:
+ if len(src) < 2 {
+ return nil, nil, fmt.Errorf("invalid escape
character")
+ }
+ src = src[1:]
+ dest = append(dest, src[0])
+ case src[0] == internalencoding.EntityDelimiter:
+ return dest, src[1:], nil
+ default:
+ dest = append(dest, src[0])
+ }
+ src = src[1:]
+ }
+ return nil, nil, fmt.Errorf("invalid variable array")
+}
+
+func isPrintable(data []byte) bool {
+ for _, b := range data {
+ if b < 32 && b != '\n' && b != '\r' && b != '\t' || b > 126 {
+ return false
+ }
+ }
+ return true
+}
+
+func printHexDump(data []byte, indent int) {
+ indentStr := strings.Repeat(" ", indent)
+ for i := 0; i < len(data); i += 16 {
+ end := i + 16
+ if end > len(data) {
+ end = len(data)
+ }
+ fmt.Printf("%s%04x:", indentStr, i)
+ for j := i; j < end; j++ {
+ fmt.Printf(" %02x", data[j])
+ }
+ // Padding
+ for j := end; j < i+16; j++ {
+ fmt.Print(" ")
+ }
+ fmt.Print(" |")
+ for j := i; j < end; j++ {
+ if data[j] >= 32 && data[j] <= 126 {
+ fmt.Printf("%c", data[j])
+ } else {
+ fmt.Print(".")
+ }
+ }
+ fmt.Println("|")
+ }
+}
+
+// Unexported helper functions to access trace package internals.
+// These mirror the internal structure definitions.
+
+type partMetadata struct {
+ CompressedSizeBytes uint64 `json:"compressedSizeBytes"`
+ UncompressedSpanSizeBytes uint64 `json:"uncompressedSpanSizeBytes"`
+ TotalCount uint64 `json:"totalCount"`
+ BlocksCount uint64 `json:"blocksCount"`
+ MinTimestamp int64 `json:"minTimestamp"`
+ MaxTimestamp int64 `json:"maxTimestamp"`
+ ID uint64 `json:"-"`
+}
+
+type primaryBlockMetadata struct {
+ traceID string
+ offset uint64
+ size uint64
+}
+
+type dataBlock struct {
+ offset uint64
+ size uint64
+}
+
+type blockMetadata struct {
+ tags map[string]*dataBlock
+ tagType map[string]pbv1.ValueType
+ spans *dataBlock
+ traceID string
+ uncompressedSpanSizeBytes uint64
+ count uint64
+}
+
+type part struct {
+ primary fs.Reader
+ spans fs.Reader
+ fileSystem fs.FileSystem
+ tagMetadata map[string]fs.Reader
+ tags map[string]fs.Reader
+ tagType map[string]pbv1.ValueType
+ path string
+ primaryBlockMetadata []primaryBlockMetadata
+ partMetadata partMetadata
+}
+
+func openFilePart(id uint64, root string, fileSystem fs.FileSystem) (*part,
error) {
+ var p part
+ partPath := filepath.Join(root, fmt.Sprintf("%016x", id))
+ p.path = partPath
+ p.fileSystem = fileSystem
+
+ // Read metadata.json
+ metadataPath := filepath.Join(partPath, "metadata.json")
+ metadata, err := fileSystem.Read(metadataPath)
+ if err != nil {
+ return nil, fmt.Errorf("cannot read metadata.json: %w", err)
+ }
+ if unmarshalErr := json.Unmarshal(metadata, &p.partMetadata);
unmarshalErr != nil {
+ return nil, fmt.Errorf("cannot parse metadata.json: %w",
unmarshalErr)
+ }
+ p.partMetadata.ID = id
+
+ // Read tag types
+ p.tagType = make(map[string]pbv1.ValueType)
+ tagTypePath := filepath.Join(partPath, "tag.type")
+ if tagTypeData, readErr := fileSystem.Read(tagTypePath); readErr == nil
&& len(tagTypeData) > 0 {
+ if unmarshalErr := unmarshalTagType(tagTypeData, p.tagType);
unmarshalErr != nil {
+ return nil, fmt.Errorf("cannot parse tag.type: %w",
unmarshalErr)
+ }
+ }
+
+ // Read primary block metadata
+ metaPath := filepath.Join(partPath, "meta.bin")
+ metaFile, err := fileSystem.OpenFile(metaPath)
+ if err != nil {
+ return nil, fmt.Errorf("cannot open meta.bin: %w", err)
+ }
+ p.primaryBlockMetadata, err = readPrimaryBlockMetadata(metaFile)
+ fs.MustClose(metaFile)
+ if err != nil {
+ return nil, fmt.Errorf("cannot read primary block metadata:
%w", err)
+ }
+
+ // Open data files
+ p.primary, err = fileSystem.OpenFile(filepath.Join(partPath,
"primary.bin"))
+ if err != nil {
+ return nil, fmt.Errorf("cannot open primary.bin: %w", err)
+ }
+
+ p.spans, err = fileSystem.OpenFile(filepath.Join(partPath, "spans.bin"))
+ if err != nil {
+ fs.MustClose(p.primary)
+ return nil, fmt.Errorf("cannot open spans.bin: %w", err)
+ }
+
+ // Open tag files
+ entries := fileSystem.ReadDir(partPath)
+ p.tags = make(map[string]fs.Reader)
+ p.tagMetadata = make(map[string]fs.Reader)
+ for _, e := range entries {
+ if e.IsDir() {
+ continue
+ }
+ if strings.HasSuffix(e.Name(), ".tm") {
+ name := e.Name()[:len(e.Name())-3]
+ reader, err :=
fileSystem.OpenFile(filepath.Join(partPath, e.Name()))
+ if err == nil {
+ p.tagMetadata[name] = reader
+ }
+ }
+ if strings.HasSuffix(e.Name(), ".t") {
+ name := e.Name()[:len(e.Name())-2]
+ reader, err :=
fileSystem.OpenFile(filepath.Join(partPath, e.Name()))
+ if err == nil {
+ p.tags[name] = reader
+ }
+ }
+ }
+
+ return &p, nil
+}
+
+func closePart(p *part) {
+ if p.primary != nil {
+ fs.MustClose(p.primary)
+ }
+ if p.spans != nil {
+ fs.MustClose(p.spans)
+ }
+ for _, r := range p.tags {
+ fs.MustClose(r)
+ }
+ for _, r := range p.tagMetadata {
+ fs.MustClose(r)
+ }
+}
+
+func readPrimaryBlockMetadata(r fs.Reader) ([]primaryBlockMetadata, error) {
+ sr := r.SequentialRead()
+ data, err := io.ReadAll(sr)
+ if err != nil {
+ return nil, fmt.Errorf("cannot read: %w", err)
+ }
+ fs.MustClose(sr)
+
+ decompressed, err := zstd.Decompress(nil, data)
+ if err != nil {
+ return nil, fmt.Errorf("cannot decompress: %w", err)
+ }
+
+ var result []primaryBlockMetadata
+ src := decompressed
+ for len(src) > 0 {
+ var pbm primaryBlockMetadata
+ src, err = unmarshalPrimaryBlockMetadata(&pbm, src)
+ if err != nil {
+ return nil, err
+ }
+ result = append(result, pbm)
+ }
+ return result, nil
+}
+
+func unmarshalPrimaryBlockMetadata(pbm *primaryBlockMetadata, src []byte)
([]byte, error) {
+ if len(src) < 4 {
+ return nil, fmt.Errorf("insufficient data")
+ }
+ src, traceIDBytes, err := encoding.DecodeBytes(src)
+ if err != nil {
+ return nil, fmt.Errorf("cannot unmarshal traceID: %w", err)
+ }
+ pbm.traceID = string(traceIDBytes)
+ if len(src) < 16 {
+ return nil, fmt.Errorf("insufficient data for offset and size")
+ }
+ pbm.offset = encoding.BytesToUint64(src)
+ src = src[8:]
+ pbm.size = encoding.BytesToUint64(src)
+ return src[8:], nil
+}
+
+func unmarshalTagType(src []byte, tagType map[string]pbv1.ValueType) error {
+ src, count := encoding.BytesToVarUint64(src)
+ for i := uint64(0); i < count; i++ {
+ var nameBytes []byte
+ var err error
+ src, nameBytes, err = encoding.DecodeBytes(src)
+ if err != nil {
+ return err
+ }
+ name := string(nameBytes)
+ if len(src) < 1 {
+ return fmt.Errorf("insufficient data for valueType")
+ }
+ valueType := pbv1.ValueType(src[0])
+ src = src[1:]
+ tagType[name] = valueType
+ }
+ return nil
+}
+
+func parseAllBlockMetadata(src []byte, tagType map[string]pbv1.ValueType)
([]*blockMetadata, error) {
+ var result []*blockMetadata
+ for len(src) > 0 {
+ bm, tail, err := parseBlockMetadata(src, tagType)
+ if err != nil {
+ return nil, err
+ }
+ result = append(result, bm)
+ src = tail
+ }
+ return result, nil
+}
+
+func parseBlockMetadata(src []byte, tagType map[string]pbv1.ValueType)
(*blockMetadata, []byte, error) {
+ var bm blockMetadata
+ bm.tagType = make(map[string]pbv1.ValueType)
+ for k, v := range tagType {
+ bm.tagType[k] = v
+ }
+
+ src, traceIDBytes, err := encoding.DecodeBytes(src)
+ if err != nil {
+ return nil, nil, fmt.Errorf("cannot unmarshal traceID: %w", err)
+ }
+ bm.traceID = string(traceIDBytes)
+
+ src, n := encoding.BytesToVarUint64(src)
+ bm.uncompressedSpanSizeBytes = n
+
+ src, n = encoding.BytesToVarUint64(src)
+ bm.count = n
+
+ bm.spans = &dataBlock{}
+ src, n = encoding.BytesToVarUint64(src)
+ bm.spans.offset = n
+ src, n = encoding.BytesToVarUint64(src)
+ bm.spans.size = n
+
+ src, tagCount := encoding.BytesToVarUint64(src)
+ if tagCount > 0 {
+ bm.tags = make(map[string]*dataBlock)
+ for i := uint64(0); i < tagCount; i++ {
+ var nameBytes []byte
+ src, nameBytes, err = encoding.DecodeBytes(src)
+ if err != nil {
+ return nil, nil, fmt.Errorf("cannot unmarshal
tag name: %w", err)
+ }
+ name := string(nameBytes)
+
+ db := &dataBlock{}
+ src, n = encoding.BytesToVarUint64(src)
+ db.offset = n
+ src, n = encoding.BytesToVarUint64(src)
+ db.size = n
+ bm.tags[name] = db
+ }
+ }
+
+ return &bm, src, nil
+}
+
+func readSpans(decoder *encoding.BytesBlockDecoder, sm *dataBlock, count int,
reader fs.Reader) ([][]byte, []string, error) {
+ data := make([]byte, sm.size)
+ fs.MustReadData(reader, int64(sm.offset), data)
+
+ var spanIDBytes [][]byte
+ var tail []byte
+ var err error
+ spanIDBytes, tail, err = decoder.DecodeWithTail(spanIDBytes[:0], data,
uint64(count))
+ if err != nil {
+ return nil, nil, fmt.Errorf("cannot decode spanIDs: %w", err)
+ }
+
+ spanIDs := make([]string, count)
+ for i, idBytes := range spanIDBytes {
+ spanIDs[i] = string(idBytes)
+ }
+
+ var spans [][]byte
+ spans, err = decoder.Decode(spans[:0], tail, uint64(count))
+ if err != nil {
+ return nil, nil, fmt.Errorf("cannot decode spans: %w", err)
+ }
+
+ return spans, spanIDs, nil
+}
+
+type tagMetadata struct {
+ min []byte
+ max []byte
+ offset uint64
+ size uint64
+ filterBlock dataBlock
+}
+
+func readTagValues(decoder *encoding.BytesBlockDecoder, tagBlock *dataBlock, _
string, count int,
+ metaReader, valueReader fs.Reader, valueType pbv1.ValueType,
+) ([][]byte, error) {
+ // Read tag metadata
+ metaData := make([]byte, tagBlock.size)
+ fs.MustReadData(metaReader, int64(tagBlock.offset), metaData)
+
+ var tm tagMetadata
+ if err := unmarshalTagMetadata(&tm, metaData); err != nil {
+ return nil, fmt.Errorf("cannot unmarshal tag metadata: %w", err)
+ }
+
+ // Check if there's actual data to read
+ if tm.size == 0 {
+ // Return nil values for all items
+ values := make([][]byte, count)
+ return values, nil
+ }
+
+ // Read tag values
+ bb := &bytes.Buffer{}
+ bb.Buf = make([]byte, tm.size)
+ fs.MustReadData(valueReader, int64(tm.offset), bb.Buf)
+
+ // Decode values using the internal encoding package
+ var err error
+ var values [][]byte
+ values, err = internalencoding.DecodeTagValues(values, decoder, bb,
valueType, count)
+ if err != nil {
+ return nil, fmt.Errorf("cannot decode tag values: %w", err)
+ }
+
+ return values, nil
+}
+
+func unmarshalTagMetadata(tm *tagMetadata, src []byte) error {
+ var n uint64
+ var err error
+
+ // Unmarshal dataBlock (offset and size)
+ src, n = encoding.BytesToVarUint64(src)
+ tm.offset = n
+ src, n = encoding.BytesToVarUint64(src)
+ tm.size = n
+
+ // Unmarshal min
+ src, tm.min, err = encoding.DecodeBytes(src)
+ if err != nil {
+ return fmt.Errorf("cannot unmarshal tagMetadata.min: %w", err)
+ }
+
+ // Unmarshal max
+ src, tm.max, err = encoding.DecodeBytes(src)
+ if err != nil {
+ return fmt.Errorf("cannot unmarshal tagMetadata.max: %w", err)
+ }
+
+ // Unmarshal filterBlock
+ src, n = encoding.BytesToVarUint64(src)
+ tm.filterBlock.offset = n
+ _, n = encoding.BytesToVarUint64(src)
+ tm.filterBlock.size = n
+
+ return nil
+}
diff --git a/banyand/cmd/dump/trace_test.go b/banyand/cmd/dump/trace_test.go
new file mode 100644
index 00000000..c07a4773
--- /dev/null
+++ b/banyand/cmd/dump/trace_test.go
@@ -0,0 +1,189 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "path/filepath"
+ "strconv"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/banyand/trace"
+ "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+// Test that the dump tool can parse the latest trace part format.
+// This test creates a real part using the trace module's flush operation,
+// then verifies the dump tool can correctly parse it.
+func TestDumpTracePartFormat(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Use trace package to create a real part using actual flush operation
+ partPath, cleanup := trace.CreateTestPartForDump(tmpPath, fileSystem)
+ defer cleanup()
+
+ // Extract part ID from path
+ partName := filepath.Base(partPath)
+ partID, err := strconv.ParseUint(partName, 16, 64)
+ require.NoError(t, err, "part directory should have valid hex name")
+
+ // Parse the part using dump tool functions
+ p, err := openFilePart(partID, filepath.Dir(partPath), fileSystem)
+ require.NoError(t, err, "should be able to open part created by trace
module")
+ defer closePart(p)
+
+ // Verify part metadata
+ assert.Equal(t, partID, p.partMetadata.ID)
+ t.Logf("Part metadata: TotalCount=%d, BlocksCount=%d",
p.partMetadata.TotalCount, p.partMetadata.BlocksCount)
+ assert.Greater(t, p.partMetadata.TotalCount, uint64(0), "should have
spans")
+ assert.Greater(t, p.partMetadata.BlocksCount, uint64(0), "should have
at least 1 block")
+ assert.Greater(t, p.partMetadata.MinTimestamp, int64(0), "should have
valid min timestamp")
+ assert.Greater(t, p.partMetadata.MaxTimestamp, int64(0), "should have
valid max timestamp")
+ assert.GreaterOrEqual(t, p.partMetadata.MaxTimestamp,
p.partMetadata.MinTimestamp)
+
+ // Verify tag types exist
+ assert.Contains(t, p.tagType, "service.name")
+ assert.Contains(t, p.tagType, "http.status")
+ assert.Contains(t, p.tagType, "timestamp")
+ assert.Contains(t, p.tagType, "tags")
+ assert.Contains(t, p.tagType, "duration")
+
+ // Verify tag types are correct
+ assert.Equal(t, pbv1.ValueTypeStr, p.tagType["service.name"])
+ assert.Equal(t, pbv1.ValueTypeInt64, p.tagType["http.status"])
+ assert.Equal(t, pbv1.ValueTypeTimestamp, p.tagType["timestamp"])
+ assert.Equal(t, pbv1.ValueTypeStrArr, p.tagType["tags"])
+ assert.Equal(t, pbv1.ValueTypeInt64, p.tagType["duration"])
+
+ // Verify primary block metadata
+ assert.Greater(t, len(p.primaryBlockMetadata), 0, "should have at least
1 primary block")
+ t.Logf("Found %d primary blocks (metadata says BlocksCount=%d)",
len(p.primaryBlockMetadata), p.partMetadata.BlocksCount)
+ for i, pbm := range p.primaryBlockMetadata {
+ t.Logf("Block %d: TraceID=%s, Offset=%d, Size=%d", i,
pbm.traceID, pbm.offset, pbm.size)
+ }
+
+ // The number of primary blocks might not match BlocksCount due to
trace block grouping logic.
+ // But we should be able to read all spans.
+ assert.LessOrEqual(t, uint64(len(p.primaryBlockMetadata)),
p.partMetadata.BlocksCount,
+ "primary blocks should not exceed BlocksCount")
+
+ // Verify we can decode all blocks
+ decoder := &encoding.BytesBlockDecoder{}
+ totalSpans := 0
+
+ for blockIdx, pbm := range p.primaryBlockMetadata {
+ // Read primary data block
+ primaryData := make([]byte, pbm.size)
+ fs.MustReadData(p.primary, int64(pbm.offset), primaryData)
+
+ // Decompress
+ decompressed, err := zstd.Decompress(nil, primaryData)
+ require.NoError(t, err, "should decompress primary data for
primary block %d", blockIdx)
+
+ // Parse ALL block metadata entries from this primary block
+ blockMetadatas, err := parseAllBlockMetadata(decompressed,
p.tagType)
+ require.NoError(t, err, "should parse all block metadata from
primary block %d", blockIdx)
+ t.Logf("Primary block %d contains %d trace blocks", blockIdx,
len(blockMetadatas))
+
+ // Process each trace block
+ for bmIdx, bm := range blockMetadatas {
+ // Read spans
+ spans, spanIDs, err := readSpans(decoder, bm.spans,
int(bm.count), p.spans)
+ require.NoError(t, err, "should read spans for trace
%s", bm.traceID)
+ assert.Len(t, spans, int(bm.count), "should have
correct number of spans")
+ assert.Len(t, spanIDs, int(bm.count), "should have
correct number of spanIDs")
+
+ totalSpans += len(spans)
+ t.Logf(" Trace block %d (TraceID=%s): read %d spans",
bmIdx, bm.traceID, len(spans))
+
+ // Read all tags
+ for tagName, tagBlock := range bm.tags {
+ tagValues, err := readTagValues(decoder,
tagBlock, tagName, int(bm.count),
+ p.tagMetadata[tagName],
p.tags[tagName], p.tagType[tagName])
+ require.NoError(t, err, "should read tag %s for
trace %s", tagName, bm.traceID)
+ assert.Len(t, tagValues, int(bm.count), "tag %s
should have value for each span", tagName)
+
+ // Verify specific tag values
+ for i, tagValue := range tagValues {
+ if tagValue == nil {
+ continue
+ }
+ switch tagName {
+ case "service.name":
+ assert.NotEmpty(t,
string(tagValue), "service.name should not be empty")
+ case "http.status":
+ status :=
convert.BytesToInt64(tagValue)
+ assert.Contains(t, []int64{200,
404, 500}, status, "http.status should be valid")
+ case "timestamp":
+ ts :=
convert.BytesToInt64(tagValue)
+ assert.Greater(t, ts, int64(0),
"timestamp should be positive")
+ assert.GreaterOrEqual(t, ts,
p.partMetadata.MinTimestamp, "timestamp should be >= min")
+ assert.LessOrEqual(t, ts,
p.partMetadata.MaxTimestamp, "timestamp should be <= max")
+ case "duration":
+ duration :=
convert.BytesToInt64(tagValue)
+ assert.Greater(t, duration,
int64(0), "duration should be positive")
+ case "tags":
+ // Verify string array can be
decoded
+ if len(tagValue) > 0 {
+ values :=
decodeStringArray(tagValue)
+ if len(values) > 0 {
+ t.Logf("
Span %d tags array: %v", i, values)
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // Note: One primary block can contain multiple trace blocks
(blockMetadata).
+ // The dump tool shows primary blocks, not logical blocks.
+ // BlocksCount = number of unique trace writes (blockMetadata)
+ // len(primaryBlockMetadata) = number of compressed primary blocks
+ // We verify we can read all the data correctly.
+ assert.Equal(t, int(p.partMetadata.TotalCount), totalSpans, "should
have parsed all spans from metadata")
+ t.Logf("Successfully parsed part with %d spans across %d primary blocks
(metadata BlocksCount=%d)",
+ totalSpans, len(p.primaryBlockMetadata),
p.partMetadata.BlocksCount)
+}
+
+func decodeStringArray(data []byte) []string {
+ var values []string
+ remaining := data
+ for len(remaining) > 0 {
+ var decoded []byte
+ var err error
+ decoded, remaining, err = unmarshalVarArray(nil, remaining)
+ if err != nil {
+ break
+ }
+ if len(decoded) > 0 {
+ values = append(values, string(decoded))
+ }
+ }
+ return values
+}
diff --git a/banyand/trace/test_helper.go b/banyand/trace/test_helper.go
new file mode 100644
index 00000000..208dcc22
--- /dev/null
+++ b/banyand/trace/test_helper.go
@@ -0,0 +1,81 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "time"
+
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+// CreateTestPartForDump creates a test trace part for testing the dump tool.
+// It returns the part path (directory) and cleanup function.
+func CreateTestPartForDump(tmpPath string, fileSystem fs.FileSystem) (string,
func()) {
+ now := time.Now().UnixNano()
+
+ // Create test traces with various tag types
+ traces := &traces{
+ traceIDs: []string{"test-trace-1", "test-trace-1",
"test-trace-2"},
+ timestamps: []int64{now, now + 1000, now + 2000},
+ spanIDs: []string{"span-1", "span-2", "span-3"},
+ spans: [][]byte{
+ []byte("span-data-1-with-content"),
+ []byte("span-data-2-with-content"),
+ []byte("span-data-3-with-content"),
+ },
+ tags: [][]*tagValue{
+ {
+ {tag: "service.name", valueType:
pbv1.ValueTypeStr, value: []byte("test-service")},
+ {tag: "http.status", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(200)},
+ {tag: "timestamp", valueType:
pbv1.ValueTypeTimestamp, value: convert.Int64ToBytes(now)},
+ {tag: "tags", valueType: pbv1.ValueTypeStrArr,
valueArr: [][]byte{[]byte("tag1"), []byte("tag2")}},
+ {tag: "duration", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(1234567)},
+ },
+ {
+ {tag: "service.name", valueType:
pbv1.ValueTypeStr, value: []byte("test-service")},
+ {tag: "http.status", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(404)},
+ {tag: "timestamp", valueType:
pbv1.ValueTypeTimestamp, value: convert.Int64ToBytes(now + 1000)},
+ {tag: "tags", valueType: pbv1.ValueTypeStrArr,
valueArr: [][]byte{[]byte("tag3"), []byte("tag4")}},
+ {tag: "duration", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(9876543)},
+ },
+ {
+ {tag: "service.name", valueType:
pbv1.ValueTypeStr, value: []byte("another-service")},
+ {tag: "http.status", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(500)},
+ {tag: "timestamp", valueType:
pbv1.ValueTypeTimestamp, value: convert.Int64ToBytes(now + 2000)},
+ {tag: "tags", valueType: pbv1.ValueTypeStrArr,
valueArr: [][]byte{[]byte("tag5")}},
+ {tag: "duration", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(5555555)},
+ },
+ },
+ }
+
+ // Create memPart and flush
+ mp := &memPart{}
+ mp.mustInitFromTraces(traces)
+
+ epoch := uint64(12345)
+ path := partPath(tmpPath, epoch)
+ mp.mustFlush(fileSystem, path)
+
+ cleanup := func() {
+ // Cleanup is handled by the caller's test.Space cleanup
+ }
+
+ return path, cleanup
+}