This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 45068773 Fix empty tag of SIDX (#854)
45068773 is described below
commit 45068773a3fb6b320b2e1dc5efb28373334af00a
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Nov 20 19:35:57 2025 +0800
Fix empty tag of SIDX (#854)
---
banyand/cmd/dump/main.go | 1 +
banyand/cmd/dump/sidx.go | 1079 +++++++++++++++++++
banyand/cmd/dump/trace.go | 1102 ++++++++++++++------
banyand/dquery/topn.go | 11 +-
banyand/internal/sidx/block.go | 6 +-
banyand/internal/sidx/interfaces.go | 37 +
banyand/internal/sidx/merge_test.go | 149 +++
banyand/internal/sidx/part_iter.go | 120 +--
banyand/internal/sidx/part_iter_test.go | 331 ++----
banyand/internal/sidx/scan_query.go | 198 ++++
banyand/internal/sidx/sidx.go | 31 +
banyand/internal/sidx/tag.go | 7 +-
banyand/liaison/grpc/measure.go | 28 +-
banyand/liaison/grpc/stream.go | 9 +-
banyand/liaison/grpc/trace.go | 9 +-
banyand/query/processor.go | 3 +
banyand/trace/query.go | 34 +-
banyand/trace/streaming_pipeline_test.go | 7 +
banyand/trace/tracing.go | 79 ++
.../logical/measure/measure_plan_distributed.go | 9 +
.../logical/stream/stream_plan_distributed.go | 6 +
pkg/query/logical/trace/trace_plan_distributed.go | 8 +
22 files changed, 2546 insertions(+), 718 deletions(-)
diff --git a/banyand/cmd/dump/main.go b/banyand/cmd/dump/main.go
index 5bb17eb5..29da173b 100644
--- a/banyand/cmd/dump/main.go
+++ b/banyand/cmd/dump/main.go
@@ -37,6 +37,7 @@ It provides subcommands for different data types (trace,
stream, measure, etc.).
}
rootCmd.AddCommand(newTraceCmd())
+ rootCmd.AddCommand(newSidxCmd())
if err := rootCmd.Execute(); err != nil {
fmt.Fprintln(os.Stderr, err)
diff --git a/banyand/cmd/dump/sidx.go b/banyand/cmd/dump/sidx.go
new file mode 100644
index 00000000..e4425a06
--- /dev/null
+++ b/banyand/cmd/dump/sidx.go
@@ -0,0 +1,1079 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "context"
+ "encoding/csv"
+ "encoding/json"
+ "fmt"
+ "math"
+ "os"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/spf13/cobra"
+ "google.golang.org/protobuf/encoding/protojson"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+ "github.com/apache/skywalking-banyandb/banyand/protector"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
+ "github.com/apache/skywalking-banyandb/pkg/query/model"
+)
+
+func newSidxCmd() *cobra.Command {
+ var sidxPath string
+ var segmentPath string
+ var criteriaJSON string
+ var csvOutput bool
+ var timeBegin string
+ var timeEnd string
+ var fullScan bool
+ var projectionTags string
+ var dataFilter string
+
+ cmd := &cobra.Command{
+ Use: "sidx",
+ Short: "Dump sidx (secondary index) data",
+ Long: `Dump and display contents of a sidx directory with
filtering.
+Query results include which part each row belongs to.
+
+The tool automatically discovers series IDs from the segment's series index.`,
+ Example: ` # Display sidx data in text format
+ dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path
/path/to/segment
+
+ # Query with time range filter
+ dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path
/path/to/segment \
+ --time-begin "2025-11-23T05:05:00Z" --time-end "2025-11-23T05:20:00Z"
+
+ # Query with criteria filter
+ dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path
/path/to/segment \
+ -criteria
'{"condition":{"name":"query","op":"BINARY_OP_HAVING","value":{"strArray":{"value":["val1","val2"]}}}}'
+
+ # Full scan mode (scans all series without requiring series ID discovery)
+ dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path
/path/to/segment --full-scan
+
+ # Project specific tags as columns
+ dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path
/path/to/segment \
+ --projection "tag1,tag2,tag3"
+
+ # Filter by data content
+ dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path
/path/to/segment \
+ --data-filter "2dd1624d5719fdfedc526f3f24d00342-4423100"
+
+ # Output as CSV
+ dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path
/path/to/segment -csv`,
+ RunE: func(_ *cobra.Command, _ []string) error {
+ if sidxPath == "" {
+ return fmt.Errorf("-sidx-path flag is required")
+ }
+ if segmentPath == "" {
+ return fmt.Errorf("-segment-path flag is
required")
+ }
+ return dumpSidx(sidxPath, segmentPath, criteriaJSON,
csvOutput, timeBegin, timeEnd, fullScan, projectionTags, dataFilter)
+ },
+ }
+
+ cmd.Flags().StringVarP(&sidxPath, "sidx-path", "s", "", "Path to the
sidx directory (required)")
+ cmd.Flags().StringVarP(&segmentPath, "segment-path", "g", "", "Path to
the segment directory (required)")
+ cmd.Flags().StringVarP(&criteriaJSON, "criteria", "c", "", "Criteria
filter as JSON string")
+ cmd.Flags().BoolVar(&csvOutput, "csv", false, "Output as CSV format")
+ cmd.Flags().StringVar(&timeBegin, "time-begin", "", "Begin time in
RFC3339 format (e.g., 2025-11-23T05:05:00Z)")
+ cmd.Flags().StringVar(&timeEnd, "time-end", "", "End time in RFC3339
format (e.g., 2025-11-23T05:20:00Z)")
+ cmd.Flags().BoolVar(&fullScan, "full-scan", false, "Scan all series
without requiring series ID discovery")
+ cmd.Flags().StringVarP(&projectionTags, "projection", "p", "",
"Comma-separated list of tags to include as columns (e.g., tag1,tag2,tag3)")
+ cmd.Flags().StringVarP(&dataFilter, "data-filter", "d", "", "Filter
rows by data content (substring match)")
+ _ = cmd.MarkFlagRequired("sidx-path")
+ _ = cmd.MarkFlagRequired("segment-path")
+
+ return cmd
+}
+
+func dumpSidx(sidxPath, segmentPath, criteriaJSON string, csvOutput bool,
timeBegin, timeEnd string, fullScan bool, projectionTags string, dataFilter
string) error {
+ // Parse time range if provided
+ var minKeyNanos, maxKeyNanos int64
+ var hasTimeRange bool
+
+ if timeBegin != "" || timeEnd != "" {
+ var err error
+ minKeyNanos, maxKeyNanos, err = parseTimeRange(timeBegin,
timeEnd)
+ if err != nil {
+ return fmt.Errorf("failed to parse time range: %w", err)
+ }
+ hasTimeRange = true
+ fmt.Fprintf(os.Stderr, "Time range filter: %s to %s\n",
+ time.Unix(0, minKeyNanos).Format(time.RFC3339),
+ time.Unix(0, maxKeyNanos).Format(time.RFC3339))
+ }
+
+ // Parse projection tags
+ var tagNames []string
+ if projectionTags != "" {
+ tagNames = parseProjectionTags(projectionTags)
+ fmt.Fprintf(os.Stderr, "Projection tags: %v\n", tagNames)
+ }
+
+ // Log data filter if provided
+ if dataFilter != "" {
+ fmt.Fprintf(os.Stderr, "Data filter: %s\n", dataFilter)
+ }
+
+ // Discover available part IDs from segment directory
+ allPartIDs, err := discoverPartIDs(segmentPath)
+ if err != nil {
+ return fmt.Errorf("failed to discover part IDs: %w", err)
+ }
+
+ if len(allPartIDs) == 0 {
+ fmt.Println("No parts found in segment directory")
+ return nil
+ }
+
+ // Filter parts by time range if specified
+ var partIDs []uint64
+ if hasTimeRange {
+ partIDs = filterPartsByTimeRange(sidxPath, allPartIDs,
minKeyNanos, maxKeyNanos)
+ fmt.Fprintf(os.Stderr, "Filtered to %d parts (out of %d) based
on time range\n", len(partIDs), len(allPartIDs))
+ } else {
+ partIDs = allPartIDs
+ }
+
+ if len(partIDs) == 0 {
+ fmt.Println("No parts match the specified time range")
+ return nil
+ }
+
+ // Parse criteria if provided
+ var criteria *modelv1.Criteria
+ if criteriaJSON != "" {
+ criteria, err = parseCriteriaJSON(criteriaJSON)
+ if err != nil {
+ return fmt.Errorf("failed to parse criteria: %w", err)
+ }
+ }
+
+ // Use full-scan mode if requested
+ if fullScan {
+ return dumpSidxFullScan(sidxPath, segmentPath, criteria,
csvOutput, hasTimeRange, minKeyNanos, maxKeyNanos, partIDs, tagNames,
dataFilter)
+ }
+
+ // Open the file system
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Create sidx options
+ opts, err := sidx.NewOptions(sidxPath, &protector.Nop{})
+ if err != nil {
+ return fmt.Errorf("failed to create sidx options: %w", err)
+ }
+ opts.AvailablePartIDs = partIDs
+
+ // Open SIDX instance
+ sidxInstance, err := sidx.NewSIDX(fileSystem, opts)
+ if err != nil {
+ return fmt.Errorf("failed to open sidx: %w", err)
+ }
+ defer sidxInstance.Close()
+
+ // Discover series IDs from the segment's series index
+ seriesIDs, err := discoverSeriesIDs(segmentPath)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: could not discover series IDs:
%v. Using default range.\n", err)
+ seriesIDs = nil
+ }
+ if len(seriesIDs) > 0 {
+ fmt.Fprintf(os.Stderr, "Discovered %d series IDs from
segment\n", len(seriesIDs))
+ }
+
+ // Create dynamic tag registry if criteria is provided
+ var tagRegistry *dynamicTagRegistry
+ if criteria != nil {
+ tagRegistry, err = newDynamicTagRegistry(sidxPath)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Failed to create tag
registry: %v. Criteria filtering may not work properly.\n", err)
+ } else {
+ fmt.Fprintf(os.Stderr, "Discovered %d tags from
sidx\n", len(tagRegistry.tagSpecs))
+ }
+ }
+
+ // Build query request
+ req, err := buildQueryRequest(criteria, seriesIDs, hasTimeRange,
minKeyNanos, maxKeyNanos, tagRegistry)
+ if err != nil {
+ return fmt.Errorf("failed to build query request: %w", err)
+ }
+
+ // Execute streaming query
+ // Use a longer timeout for dump operations since they can process
large datasets
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
+ defer cancel()
+
+ resultsCh, errCh := sidxInstance.StreamingQuery(ctx, req)
+
+ // Process and output results
+ if csvOutput {
+ return outputSidxResultsAsCSV(resultsCh, errCh, dataFilter)
+ }
+ return outputSidxResultsAsText(resultsCh, errCh, sidxPath, dataFilter)
+}
+
+func parseTimeRange(timeBegin, timeEnd string) (int64, int64, error) {
+ var minKeyNanos, maxKeyNanos int64
+
+ if timeBegin != "" {
+ t, err := time.Parse(time.RFC3339, timeBegin)
+ if err != nil {
+ return 0, 0, fmt.Errorf("invalid time-begin format: %w
(expected RFC3339, e.g., 2025-11-23T05:05:00Z)", err)
+ }
+ minKeyNanos = t.UnixNano()
+ } else {
+ minKeyNanos = math.MinInt64
+ }
+
+ if timeEnd != "" {
+ t, err := time.Parse(time.RFC3339, timeEnd)
+ if err != nil {
+ return 0, 0, fmt.Errorf("invalid time-end format: %w
(expected RFC3339, e.g., 2025-11-23T05:20:00Z)", err)
+ }
+ maxKeyNanos = t.UnixNano()
+ } else {
+ maxKeyNanos = math.MaxInt64
+ }
+
+ if minKeyNanos > maxKeyNanos {
+ return 0, 0, fmt.Errorf("time-begin must be before time-end")
+ }
+
+ return minKeyNanos, maxKeyNanos, nil
+}
+
+func filterPartsByTimeRange(sidxPath string, partIDs []uint64, minKey, maxKey
int64) []uint64 {
+ fileSystem := fs.NewLocalFileSystem()
+ var filteredParts []uint64
+
+ for _, partID := range partIDs {
+ partPath := filepath.Join(sidxPath, fmt.Sprintf("%016x",
partID))
+ manifestPath := filepath.Join(partPath, "manifest.json")
+
+ // Read manifest.json
+ manifestData, err := fileSystem.Read(manifestPath)
+ if err != nil {
+ // Skip parts that don't have a manifest
+ continue
+ }
+
+ // Parse manifest to get minKey and maxKey
+ var manifest struct {
+ MinKey int64 `json:"minKey"`
+ MaxKey int64 `json:"maxKey"`
+ }
+
+ if err := json.Unmarshal(manifestData, &manifest); err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: failed to parse
manifest for part %016x: %v\n", partID, err)
+ continue
+ }
+
+ // Check if part overlaps with the requested time range
+ if manifest.MaxKey >= minKey && manifest.MinKey <= maxKey {
+ filteredParts = append(filteredParts, partID)
+ }
+ }
+
+ return filteredParts
+}
+
+func parseCriteriaJSON(criteriaJSON string) (*modelv1.Criteria, error) {
+ criteria := &modelv1.Criteria{}
+ err := protojson.Unmarshal([]byte(criteriaJSON), criteria)
+ if err != nil {
+ return nil, fmt.Errorf("invalid criteria JSON: %w", err)
+ }
+ return criteria, nil
+}
+
+func discoverPartIDs(segmentPath string) ([]uint64, error) {
+ // Look for shard directories in the segment path
+ shardDirs, err := filepath.Glob(filepath.Join(segmentPath, "shard-*"))
+ if err != nil {
+ return nil, fmt.Errorf("failed to glob shard directories: %w",
err)
+ }
+
+ if len(shardDirs) == 0 {
+ return nil, fmt.Errorf("no shard directories found in segment
path")
+ }
+
+ // Collect all part IDs from all shards
+ partIDMap := make(map[uint64]bool)
+
+ for _, shardDir := range shardDirs {
+ entries, err := os.ReadDir(shardDir)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: failed to read shard
directory %s: %v\n", shardDir, err)
+ continue
+ }
+
+ for _, entry := range entries {
+ if !entry.IsDir() {
+ continue
+ }
+ // Part directories are named as hex IDs (e.g.,
"0000000000004db4")
+ name := entry.Name()
+ // Skip special directories
+ if name == "sidx" || name == "meta" {
+ continue
+ }
+ // Try to parse as hex
+ partID, err := strconv.ParseUint(name, 16, 64)
+ if err == nil {
+ partIDMap[partID] = true
+ }
+ }
+ }
+
+ // Convert map to sorted slice
+ partIDs := make([]uint64, 0, len(partIDMap))
+ for partID := range partIDMap {
+ partIDs = append(partIDs, partID)
+ }
+ sort.Slice(partIDs, func(i, j int) bool {
+ return partIDs[i] < partIDs[j]
+ })
+
+ return partIDs, nil
+}
+
+func discoverSeriesIDs(segmentPath string) ([]common.SeriesID, error) {
+ // Open the series index (sidx directory under segment)
+ seriesIndexPath := filepath.Join(segmentPath, "sidx")
+
+ l := logger.GetLogger("dump-sidx")
+
+ // Create inverted index store
+ store, err := inverted.NewStore(inverted.StoreOpts{
+ Path: seriesIndexPath,
+ Logger: l,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("failed to open series index: %w", err)
+ }
+ defer store.Close()
+
+ // Get series iterator
+ ctx := context.Background()
+ iter, err := store.SeriesIterator(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create series iterator: %w",
err)
+ }
+ defer iter.Close()
+
+ // Collect all series IDs
+ var seriesIDs []common.SeriesID
+ for iter.Next() {
+ series := iter.Val()
+ // Compute series ID from EntityValues using hash
+ if len(series.EntityValues) > 0 {
+ seriesID :=
common.SeriesID(convert.Hash(series.EntityValues))
+ seriesIDs = append(seriesIDs, seriesID)
+ }
+ }
+
+ return seriesIDs, nil
+}
+
+func buildQueryRequest(
+ criteria *modelv1.Criteria, seriesIDs []common.SeriesID, hasTimeRange
bool, minKeyNanos, maxKeyNanos int64, tagRegistry *dynamicTagRegistry,
+) (sidx.QueryRequest, error) {
+ // Use discovered series IDs if available, otherwise fall back to
default range
+ if len(seriesIDs) == 0 {
+ // Use a range of series IDs to increase chance of finding data
+ // Since we don't know what series IDs exist, we'll use a large
range
+ // Series IDs are uint64 hash values, so they can be quite large
+ // We'll sample the space to cover likely values
+ seriesIDs = make([]common.SeriesID, 0, 10000)
+
+ // Add first 10000 series IDs (for simple/test data)
+ for i := common.SeriesID(1); i <= 10000; i++ {
+ seriesIDs = append(seriesIDs, i)
+ }
+ }
+
+ req := sidx.QueryRequest{
+ SeriesIDs: seriesIDs,
+ MaxBatchSize: 100, // Smaller batch size for better
responsiveness
+ }
+
+ // Set min/max key based on time range if provided, otherwise full range
+ var minKey, maxKey int64
+ if hasTimeRange {
+ minKey = minKeyNanos
+ maxKey = maxKeyNanos
+ } else {
+ minKey = math.MinInt64
+ maxKey = math.MaxInt64
+ }
+ req.MinKey = &minKey
+ req.MaxKey = &maxKey
+
+ // If criteria is provided, build tag filter
+ if criteria != nil && tagRegistry != nil {
+ tagFilter, err := logical.BuildSimpleTagFilter(criteria)
+ if err != nil {
+ return req, fmt.Errorf("failed to build tag filter from
criteria: %w", err)
+ }
+
+ // Create a tag filter matcher if filter is not dummy
+ if tagFilter != nil && tagFilter != logical.DummyFilter {
+ // Create tag filter matcher with the dynamic registry
and decoder
+ tagFilterMatcher :=
logical.NewTagFilterMatcher(tagFilter, tagRegistry, simpleTagValueDecoder)
+ req.TagFilter = tagFilterMatcher
+ fmt.Fprintf(os.Stderr, "Applied criteria filter: %v\n",
tagFilter)
+ }
+ }
+
+ return req, nil
+}
+
+func outputSidxResultsAsText(resultsCh <-chan *sidx.QueryResponse, errCh
<-chan error, sidxPath string, dataFilter string) error {
+ fmt.Printf("Opening sidx: %s\n", sidxPath)
+
fmt.Printf("================================================================================\n\n")
+
+ totalRows := 0
+ filteredRows := 0
+ batchNum := 0
+
+ // Process results and errors
+ for resultsCh != nil || errCh != nil {
+ select {
+ case err, ok := <-errCh:
+ if !ok {
+ errCh = nil
+ continue
+ }
+ if err != nil {
+ return fmt.Errorf("query error: %w", err)
+ }
+ case resp, ok := <-resultsCh:
+ if !ok {
+ resultsCh = nil
+ continue
+ }
+
+ if resp.Error != nil {
+ return fmt.Errorf("response error: %w",
resp.Error)
+ }
+
+ if resp.Len() > 0 {
+ batchNum++
+ batchPrinted := false
+
+ for i := 0; i < resp.Len(); i++ {
+ dataStr := string(resp.Data[i])
+
+ // Apply data filter if specified
+ if dataFilter != "" &&
!strings.Contains(dataStr, dataFilter) {
+ filteredRows++
+ continue
+ }
+
+ // Print batch header only when we have
matching rows
+ if !batchPrinted {
+ fmt.Printf("Batch %d:\n",
batchNum)
+
fmt.Printf("--------------------------------------------------------------------------------\n")
+ batchPrinted = true
+ }
+
+ totalRows++
+ fmt.Printf("Row %d:\n", totalRows)
+ fmt.Printf(" PartID: %d (0x%016x)\n",
resp.PartIDs[i], resp.PartIDs[i])
+ fmt.Printf(" Key: %d\n", resp.Keys[i])
+ fmt.Printf(" SeriesID: %d\n",
resp.SIDs[i])
+ fmt.Printf(" Data: %s\n", dataStr)
+ fmt.Printf("\n")
+ }
+ }
+ }
+ }
+
+ if dataFilter != "" {
+ fmt.Printf("\nTotal rows: %d (filtered out: %d)\n", totalRows,
filteredRows)
+ } else {
+ fmt.Printf("\nTotal rows: %d\n", totalRows)
+ }
+ return nil
+}
+
+func outputSidxResultsAsCSV(resultsCh <-chan *sidx.QueryResponse, errCh <-chan
error, dataFilter string) error {
+ writer := csv.NewWriter(os.Stdout)
+ defer writer.Flush()
+
+ // Write header
+ header := []string{"PartID", "Key", "SeriesID", "Data"}
+ if err := writer.Write(header); err != nil {
+ return fmt.Errorf("failed to write CSV header: %w", err)
+ }
+
+ // Process results and errors
+ for resultsCh != nil || errCh != nil {
+ select {
+ case err, ok := <-errCh:
+ if !ok {
+ errCh = nil
+ continue
+ }
+ if err != nil {
+ return fmt.Errorf("query error: %w", err)
+ }
+ case resp, ok := <-resultsCh:
+ if !ok {
+ resultsCh = nil
+ continue
+ }
+
+ if resp.Error != nil {
+ return fmt.Errorf("response error: %w",
resp.Error)
+ }
+
+ for i := 0; i < resp.Len(); i++ {
+ dataStr := string(resp.Data[i])
+
+ // Apply data filter if specified
+ if dataFilter != "" &&
!strings.Contains(dataStr, dataFilter) {
+ continue
+ }
+
+ row := []string{
+ fmt.Sprintf("%d", resp.PartIDs[i]),
+ fmt.Sprintf("%d", resp.Keys[i]),
+ fmt.Sprintf("%d", resp.SIDs[i]),
+ dataStr,
+ }
+ if err := writer.Write(row); err != nil {
+ return fmt.Errorf("failed to write CSV
row: %w", err)
+ }
+ }
+ }
+ }
+
+ return nil
+}
+
+// simpleTagValueDecoder is a simple decoder for tag values used in filtering.
+// It handles basic tag value types without needing full schema information.
+func simpleTagValueDecoder(valueType pbv1.ValueType, value []byte, valueArr
[][]byte) *modelv1.TagValue {
+ if value == nil && valueArr == nil {
+ return pbv1.NullTagValue
+ }
+
+ switch valueType {
+ case pbv1.ValueTypeStr:
+ if value == nil {
+ return pbv1.NullTagValue
+ }
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: string(value),
+ },
+ },
+ }
+ case pbv1.ValueTypeInt64:
+ if value == nil {
+ return pbv1.NullTagValue
+ }
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Int{
+ Int: &modelv1.Int{
+ Value: convert.BytesToInt64(value),
+ },
+ },
+ }
+ case pbv1.ValueTypeStrArr:
+ var values []string
+ for _, v := range valueArr {
+ values = append(values, string(v))
+ }
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_StrArray{
+ StrArray: &modelv1.StrArray{
+ Value: values,
+ },
+ },
+ }
+ case pbv1.ValueTypeBinaryData:
+ if value == nil {
+ return pbv1.NullTagValue
+ }
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_BinaryData{
+ BinaryData: value,
+ },
+ }
+ default:
+ // For unknown types, try to return as string
+ if value != nil {
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: string(value),
+ },
+ },
+ }
+ }
+ return pbv1.NullTagValue
+ }
+}
+
+// dynamicTagRegistry implements TagSpecRegistry by discovering tags from the
sidx.
+type dynamicTagRegistry struct {
+ tagSpecs map[string]*logical.TagSpec
+}
+
+// newDynamicTagRegistry creates a registry by discovering tag names from the
sidx directory.
+func newDynamicTagRegistry(sidxPath string) (*dynamicTagRegistry, error) {
+ registry := &dynamicTagRegistry{
+ tagSpecs: make(map[string]*logical.TagSpec),
+ }
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ // List all directories in sidxPath to find parts
+ entries := fileSystem.ReadDir(sidxPath)
+ if len(entries) == 0 {
+ return nil, fmt.Errorf("failed to read sidx directory or
directory is empty")
+ }
+
+ tagNamesMap := make(map[string]bool)
+
+ // Scan first part to discover tag names
+ for _, entry := range entries {
+ if !entry.IsDir() {
+ continue
+ }
+
+ partPath := filepath.Join(sidxPath, entry.Name())
+ partEntries := fileSystem.ReadDir(partPath)
+
+ // Look for .td (tag data) files
+ for _, partEntry := range partEntries {
+ if partEntry.IsDir() {
+ continue
+ }
+ name := partEntry.Name()
+ // Tag data files are named like "tagname.td"
+ if strings.HasSuffix(name, ".td") {
+ tagName := strings.TrimSuffix(name, ".td")
+ tagNamesMap[tagName] = true
+ }
+ }
+
+ // Only scan first part to save time
+ if len(tagNamesMap) > 0 {
+ break
+ }
+ }
+
+ // Create tag specs for discovered tags
+ // We assume all tags are string arrays since we don't have schema info
+ tagFamilyIdx := 0
+ tagIdx := 0
+ for tagName := range tagNamesMap {
+ registry.tagSpecs[tagName] = &logical.TagSpec{
+ Spec: &databasev1.TagSpec{
+ Name: tagName,
+ Type: databasev1.TagType_TAG_TYPE_STRING_ARRAY,
+ },
+ TagFamilyIdx: tagFamilyIdx,
+ TagIdx: tagIdx,
+ }
+ tagIdx++
+ }
+
+ return registry, nil
+}
+
+func (d *dynamicTagRegistry) FindTagSpecByName(name string) *logical.TagSpec {
+ if spec, ok := d.tagSpecs[name]; ok {
+ return spec
+ }
+ // If tag not found, return a default spec for it
+ // This allows filtering on any tag name
+ return &logical.TagSpec{
+ Spec: &databasev1.TagSpec{
+ Name: name,
+ Type: databasev1.TagType_TAG_TYPE_STRING_ARRAY,
+ },
+ TagFamilyIdx: 0,
+ TagIdx: 0,
+ }
+}
+
+// IndexDefined implements IndexChecker interface (stub implementation).
+func (d *dynamicTagRegistry) IndexDefined(_ string) (bool,
*databasev1.IndexRule) {
+ return false, nil
+}
+
+// IndexRuleDefined implements IndexChecker interface (stub implementation).
+func (d *dynamicTagRegistry) IndexRuleDefined(_ string) (bool,
*databasev1.IndexRule) {
+ return false, nil
+}
+
+// EntityList implements Schema interface (stub implementation).
+func (d *dynamicTagRegistry) EntityList() []string {
+ return nil
+}
+
+// CreateTagRef implements Schema interface (stub implementation).
+func (d *dynamicTagRegistry) CreateTagRef(_ ...[]*logical.Tag)
([][]*logical.TagRef, error) {
+ return nil, fmt.Errorf("CreateTagRef not supported in dump tool")
+}
+
+// CreateFieldRef implements Schema interface (stub implementation).
+func (d *dynamicTagRegistry) CreateFieldRef(_ ...*logical.Field)
([]*logical.FieldRef, error) {
+ return nil, fmt.Errorf("CreateFieldRef not supported in dump tool")
+}
+
+// ProjTags implements Schema interface (stub implementation).
+func (d *dynamicTagRegistry) ProjTags(_ ...[]*logical.TagRef) logical.Schema {
+ return d
+}
+
+// ProjFields implements Schema interface (stub implementation).
+func (d *dynamicTagRegistry) ProjFields(_ ...*logical.FieldRef) logical.Schema
{
+ return d
+}
+
+// Children implements Schema interface (stub implementation).
+func (d *dynamicTagRegistry) Children() []logical.Schema {
+ return nil
+}
+
+func dumpSidxFullScan(sidxPath, segmentPath string, criteria
*modelv1.Criteria, csvOutput bool,
+ hasTimeRange bool, minKeyNanos, maxKeyNanos int64, partIDs []uint64,
projectionTagNames []string, dataFilter string,
+) error {
+ fmt.Fprintf(os.Stderr, "Using full-scan mode (no series ID
filtering)\n")
+
+ // Load series information for human-readable output
+ seriesMap, err := loadSeriesMap(segmentPath)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Failed to load series
information: %v\n", err)
+ seriesMap = nil // Continue without series names
+ } else {
+ fmt.Fprintf(os.Stderr, "Loaded %d series from segment\n",
len(seriesMap))
+ }
+
+ // Build tag projection
+ var tagProjection []model.TagProjection
+ if len(projectionTagNames) > 0 {
+ // Create a single TagProjection with all the requested tag
names
+ tagProjection = []model.TagProjection{
+ {
+ Family: "", // Empty family for SIDX tags
+ Names: projectionTagNames,
+ },
+ }
+ }
+
+ // Create dynamic tag registry if criteria is provided
+ var tagRegistry *dynamicTagRegistry
+ if criteria != nil {
+ var regErr error
+ tagRegistry, regErr = newDynamicTagRegistry(sidxPath)
+ if regErr != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Failed to create tag
registry: %v\n", regErr)
+ }
+ }
+
+ // Build scan request
+ var minKey, maxKey int64
+ if hasTimeRange {
+ minKey = minKeyNanos
+ maxKey = maxKeyNanos
+ } else {
+ minKey = math.MinInt64
+ maxKey = math.MaxInt64
+ }
+
+ // Track progress
+ startTime := time.Now()
+
+ scanReq := sidx.ScanQueryRequest{
+ MinKey: &minKey,
+ MaxKey: &maxKey,
+ MaxBatchSize: 1000,
+ TagProjection: tagProjection,
+ OnProgress: func(currentPart, totalParts int, rowsFound int) {
+ elapsed := time.Since(startTime)
+ fmt.Fprintf(os.Stderr, "\rScanning part %d/%d... Found
%d rows (elapsed: %s)",
+ currentPart, totalParts, rowsFound,
elapsed.Round(time.Second))
+ os.Stderr.Sync() // Flush immediately to show progress
in real-time
+ },
+ }
+
+ // Apply criteria filter if provided
+ if criteria != nil && tagRegistry != nil {
+ fmt.Fprintf(os.Stderr, "Discovered %d tags from sidx\n",
len(tagRegistry.tagSpecs))
+
+ tagFilter, filterErr := logical.BuildSimpleTagFilter(criteria)
+ if filterErr != nil {
+ return fmt.Errorf("failed to build tag filter: %w",
filterErr)
+ }
+ if tagFilter != nil && tagFilter != logical.DummyFilter {
+ scanReq.TagFilter =
logical.NewTagFilterMatcher(tagFilter, tagRegistry, simpleTagValueDecoder)
+ fmt.Fprintf(os.Stderr, "Applied criteria filter: %v\n",
tagFilter)
+ }
+ }
+
+ // Open SIDX
+ fileSystem := fs.NewLocalFileSystem()
+ opts, err := sidx.NewOptions(sidxPath, &protector.Nop{})
+ if err != nil {
+ return fmt.Errorf("failed to create sidx options: %w", err)
+ }
+ opts.AvailablePartIDs = partIDs
+
+ sidxInstance, err := sidx.NewSIDX(fileSystem, opts)
+ if err != nil {
+ return fmt.Errorf("failed to open sidx: %w", err)
+ }
+ defer sidxInstance.Close()
+
+ // Execute scan query
+ // Use a longer timeout for dump operations since they can process
large datasets
+ // Full-scan mode can be slower when scanning many parts with filtering
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
+ defer cancel()
+
+ results, err := sidxInstance.ScanQuery(ctx, scanReq)
+ if err != nil {
+ return fmt.Errorf("scan query failed: %w", err)
+ }
+
+ // Print newline after progress output and flush to ensure clean output
+ fmt.Fprintf(os.Stderr, "\nScan complete.\n")
+ os.Stderr.Sync() // Ensure all stderr output is flushed
+
+ // Output results
+ if csvOutput {
+ return outputScanResultsAsCSV(results, seriesMap,
projectionTagNames, dataFilter)
+ }
+ return outputScanResultsAsText(results, sidxPath, seriesMap,
projectionTagNames, dataFilter)
+}
+
+// parseProjectionTags parses a comma-separated list of tag names.
+func parseProjectionTags(projectionStr string) []string {
+ if projectionStr == "" {
+ return nil
+ }
+
+ tags := strings.Split(projectionStr, ",")
+ result := make([]string, 0, len(tags))
+ for _, tag := range tags {
+ tag = strings.TrimSpace(tag)
+ if tag != "" {
+ result = append(result, tag)
+ }
+ }
+ return result
+}
+
+// loadSeriesMap loads all series from the segment's series index and creates
a map from SeriesID to text representation.
+func loadSeriesMap(segmentPath string) (map[common.SeriesID]string, error) {
+ seriesIndexPath := filepath.Join(segmentPath, "sidx")
+
+ l := logger.GetLogger("dump-sidx")
+
+ // Create inverted index store
+ store, err := inverted.NewStore(inverted.StoreOpts{
+ Path: seriesIndexPath,
+ Logger: l,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("failed to open series index: %w", err)
+ }
+ defer store.Close()
+
+ // Get series iterator
+ ctx := context.Background()
+ iter, err := store.SeriesIterator(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create series iterator: %w",
err)
+ }
+ defer iter.Close()
+
+ // Build map of SeriesID -> text representation
+ seriesMap := make(map[common.SeriesID]string)
+ for iter.Next() {
+ series := iter.Val()
+ if len(series.EntityValues) > 0 {
+ seriesID :=
common.SeriesID(convert.Hash(series.EntityValues))
+ // Convert EntityValues bytes to readable string
+ seriesText := string(series.EntityValues)
+ seriesMap[seriesID] = seriesText
+ }
+ }
+
+ return seriesMap, nil
+}
+
+func outputScanResultsAsText(results []*sidx.QueryResponse, sidxPath string,
seriesMap map[common.SeriesID]string, projectionTagNames []string, dataFilter
string) error {
+ fmt.Printf("Opening sidx: %s\n", sidxPath)
+
fmt.Printf("================================================================================\n\n")
+
+ totalRows := 0
+ filteredRows := 0
+ for batchNum, resp := range results {
+ if resp.Error != nil {
+ return fmt.Errorf("response error in batch %d: %w",
batchNum+1, resp.Error)
+ }
+
+ batchPrinted := false
+
+ for i := 0; i < resp.Len(); i++ {
+ dataStr := string(resp.Data[i])
+
+ // Apply data filter if specified
+ if dataFilter != "" && !strings.Contains(dataStr,
dataFilter) {
+ filteredRows++
+ continue
+ }
+
+ // Print batch header only when we have matching rows
+ if !batchPrinted {
+ fmt.Printf("Batch %d:\n", batchNum+1)
+
fmt.Printf("--------------------------------------------------------------------------------\n")
+ batchPrinted = true
+ }
+
+ fmt.Printf("Row %d:\n", totalRows+1)
+ fmt.Printf(" PartID: %d (0x%016x)\n", resp.PartIDs[i],
resp.PartIDs[i])
+ fmt.Printf(" Key: %d\n", resp.Keys[i])
+ fmt.Printf(" SeriesID: %d\n", resp.SIDs[i])
+
+ // Add series text if available
+ if seriesMap != nil {
+ if seriesText, ok := seriesMap[resp.SIDs[i]];
ok {
+ fmt.Printf(" Series: %s\n", seriesText)
+ }
+ }
+
+ // Add projected tags if available
+ if len(projectionTagNames) > 0 && resp.Tags != nil {
+ for _, tagName := range projectionTagNames {
+ if tagValues, ok := resp.Tags[tagName];
ok && i < len(tagValues) {
+ tagValue := tagValues[i]
+ // Calculate size of the tag
value
+ tagSize := len(tagValue)
+ fmt.Printf(" %s: %s (size: %d
bytes)\n", tagName, tagValue, tagSize)
+ }
+ }
+ }
+
+ fmt.Printf(" Data: %s\n", dataStr)
+ fmt.Printf("\n")
+ totalRows++
+ }
+
+ // (Already added newline after each row)
+ }
+
+ if dataFilter != "" {
+ fmt.Printf("\nTotal rows: %d (filtered out: %d)\n", totalRows,
filteredRows)
+ } else {
+ fmt.Printf("\nTotal rows: %d\n", totalRows)
+ }
+ return nil
+}
+
+func outputScanResultsAsCSV(results []*sidx.QueryResponse, seriesMap
map[common.SeriesID]string, projectionTagNames []string, dataFilter string)
error {
+ writer := csv.NewWriter(os.Stdout)
+ defer writer.Flush()
+
+ // Write header
+ header := []string{"PartID", "Key", "SeriesID", "Series"}
+ // Add projected tag columns (with size)
+ for _, tagName := range projectionTagNames {
+ header = append(header, tagName, tagName+"_size")
+ }
+ header = append(header, "Data")
+
+ if err := writer.Write(header); err != nil {
+ return fmt.Errorf("failed to write CSV header: %w", err)
+ }
+
+ for _, resp := range results {
+ if resp.Error != nil {
+ return fmt.Errorf("response error: %w", resp.Error)
+ }
+
+ for i := 0; i < resp.Len(); i++ {
+ dataStr := string(resp.Data[i])
+
+ // Apply data filter if specified
+ if dataFilter != "" && !strings.Contains(dataStr,
dataFilter) {
+ continue
+ }
+
+ seriesText := ""
+ if seriesMap != nil {
+ if text, ok := seriesMap[resp.SIDs[i]]; ok {
+ seriesText = text
+ }
+ }
+
+ row := []string{
+ fmt.Sprintf("%d", resp.PartIDs[i]),
+ fmt.Sprintf("%d", resp.Keys[i]),
+ fmt.Sprintf("%d", resp.SIDs[i]),
+ seriesText,
+ }
+
+ // Add projected tag values with size
+ for _, tagName := range projectionTagNames {
+ tagValue := ""
+ tagSize := ""
+
+ if resp.Tags != nil {
+ if tagValues, ok := resp.Tags[tagName];
ok && i < len(tagValues) {
+ tagValue = tagValues[i]
+ // Calculate size of the tag
value
+ tagSize = fmt.Sprintf("%d",
len(tagValue))
+ }
+ }
+
+ row = append(row, tagValue, tagSize)
+ }
+
+ row = append(row, dataStr)
+
+ if err := writer.Write(row); err != nil {
+ return fmt.Errorf("failed to write CSV row:
%w", err)
+ }
+ }
+ }
+
+ return nil
+}
diff --git a/banyand/cmd/dump/trace.go b/banyand/cmd/dump/trace.go
index d779a658..73ecff5a 100644
--- a/banyand/cmd/dump/trace.go
+++ b/banyand/cmd/dump/trace.go
@@ -18,6 +18,7 @@
package main
import (
+ "context"
"encoding/csv"
"encoding/json"
"fmt"
@@ -30,331 +31,112 @@ import (
"time"
"github.com/spf13/cobra"
+ "google.golang.org/protobuf/encoding/protojson"
+ "github.com/apache/skywalking-banyandb/api/common"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
internalencoding
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
)
+type traceDumpOptions struct {
+ shardPath string
+ segmentPath string
+ criteriaJSON string
+ projectionTags string
+ verbose bool
+ csvOutput bool
+}
+
func newTraceCmd() *cobra.Command {
- var partPath string
+ var shardPath string
+ var segmentPath string
var verbose bool
var csvOutput bool
+ var criteriaJSON string
+ var projectionTags string
cmd := &cobra.Command{
Use: "trace",
- Short: "Dump trace part data",
- Long: `Dump and display contents of a trace part directory.
-Outputs trace data in human-readable format or CSV.`,
- Example: ` # Display trace data in text format
- dump trace -path /path/to/part/0000000000004db4
+ Short: "Dump trace shard data",
+ Long: `Dump and display contents of a trace shard directory
(containing multiple parts).
+Outputs trace data in human-readable format or CSV.
+
+Supports filtering by criteria and projecting specific tags.`,
+ Example: ` # Display trace data from shard in text format
+ dump trace --shard-path /path/to/shard-0 --segment-path /path/to/segment
# Display with verbose hex dumps
- dump trace -path /path/to/part/0000000000004db4 -v
+ dump trace --shard-path /path/to/shard-0 --segment-path /path/to/segment -v
+
+ # Filter by criteria
+ dump trace --shard-path /path/to/shard-0 --segment-path /path/to/segment \
+ --criteria
'{"condition":{"name":"query","op":"BINARY_OP_HAVING","value":{"strArray":{"value":["tag1=value1","tag2=value2"]}}}}'
+
+ # Project specific tags
+ dump trace --shard-path /path/to/shard-0 --segment-path /path/to/segment \
+ --projection "tag1,tag2,tag3"
# Output as CSV
- dump trace -path /path/to/part/0000000000004db4 -csv
+ dump trace --shard-path /path/to/shard-0 --segment-path /path/to/segment
--csv
# Save CSV to file
- dump trace -path /path/to/part/0000000000004db4 -csv > output.csv`,
+ dump trace --shard-path /path/to/shard-0 --segment-path /path/to/segment
--csv > output.csv`,
RunE: func(_ *cobra.Command, _ []string) error {
- if partPath == "" {
- return fmt.Errorf("-path flag is required")
+ if shardPath == "" {
+ return fmt.Errorf("--shard-path flag is
required")
+ }
+ if segmentPath == "" {
+ return fmt.Errorf("--segment-path flag is
required")
}
- return dumpTracePart(partPath, verbose, csvOutput)
+ return dumpTraceShard(traceDumpOptions{
+ shardPath: shardPath,
+ segmentPath: segmentPath,
+ verbose: verbose,
+ csvOutput: csvOutput,
+ criteriaJSON: criteriaJSON,
+ projectionTags: projectionTags,
+ })
},
}
- cmd.Flags().StringVarP(&partPath, "path", "p", "", "Path to the trace
part directory (required)")
+ cmd.Flags().StringVar(&shardPath, "shard-path", "", "Path to the shard
directory (required)")
+ cmd.Flags().StringVarP(&segmentPath, "segment-path", "g", "", "Path to
the segment directory (required)")
cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output
(show raw data)")
- cmd.Flags().BoolVarP(&csvOutput, "csv", "c", false, "Output as CSV
format")
- _ = cmd.MarkFlagRequired("path")
+ cmd.Flags().BoolVar(&csvOutput, "csv", false, "Output as CSV format")
+ cmd.Flags().StringVarP(&criteriaJSON, "criteria", "c", "", "Criteria
filter as JSON string")
+ cmd.Flags().StringVarP(&projectionTags, "projection", "p", "",
"Comma-separated list of tags to include as columns (e.g., tag1,tag2,tag3)")
+ _ = cmd.MarkFlagRequired("shard-path")
+ _ = cmd.MarkFlagRequired("segment-path")
return cmd
}
-func dumpTracePart(partPath string, verbose bool, csvOutput bool) error {
- // Get the part ID from the directory name
- partName := filepath.Base(partPath)
- partID, err := strconv.ParseUint(partName, 16, 64)
- if err != nil {
- return fmt.Errorf("invalid part name %q: %w", partName, err)
- }
-
- // Get the root path (parent directory)
- rootPath := filepath.Dir(partPath)
-
- // Open the file system
- fileSystem := fs.NewLocalFileSystem()
-
- // Open the part
- p, err := openFilePart(partID, rootPath, fileSystem)
- if err != nil {
- return fmt.Errorf("failed to open part: %w", err)
+// dumpTraceShard has high complexity due to multiple output formats and
filtering options.
+func dumpTraceShard(opts traceDumpOptions) error {
+ ctx, err := newTraceDumpContext(opts)
+ if err != nil || ctx == nil {
+ return err
}
- defer closePart(p)
-
- if csvOutput {
- return dumpPartAsCSV(p)
- }
-
- // Original text output
- fmt.Printf("Opening trace part: %s (ID: %d)\n", partPath, partID)
-
fmt.Printf("================================================================================\n\n")
-
- // Print part metadata
- fmt.Printf("Part Metadata:\n")
- fmt.Printf(" ID: %d (0x%016x)\n", p.partMetadata.ID, p.partMetadata.ID)
- fmt.Printf(" Total Count: %d\n", p.partMetadata.TotalCount)
- fmt.Printf(" Blocks Count: %d\n", p.partMetadata.BlocksCount)
- fmt.Printf(" Min Timestamp: %s (%d)\n",
formatTimestamp(p.partMetadata.MinTimestamp), p.partMetadata.MinTimestamp)
- fmt.Printf(" Max Timestamp: %s (%d)\n",
formatTimestamp(p.partMetadata.MaxTimestamp), p.partMetadata.MaxTimestamp)
- fmt.Printf(" Compressed Size: %d bytes\n",
p.partMetadata.CompressedSizeBytes)
- fmt.Printf(" Uncompressed Span Size: %d bytes\n",
p.partMetadata.UncompressedSpanSizeBytes)
- fmt.Printf("\n")
-
- // Print tag types
- if len(p.tagType) > 0 {
- fmt.Printf("Tag Types:\n")
- tagNames := make([]string, 0, len(p.tagType))
- for name := range p.tagType {
- tagNames = append(tagNames, name)
- }
- sort.Strings(tagNames)
- for _, name := range tagNames {
- fmt.Printf(" %s: %s\n", name,
valueTypeName(p.tagType[name]))
- }
- fmt.Printf("\n")
- }
-
- // Print primary block metadata
- fmt.Printf("Primary Block Metadata (Total: %d blocks):\n",
len(p.primaryBlockMetadata))
-
fmt.Printf("--------------------------------------------------------------------------------\n")
- for i, pbm := range p.primaryBlockMetadata {
- fmt.Printf("Block %d:\n", i)
- fmt.Printf(" TraceID: %s\n", pbm.traceID)
- fmt.Printf(" Offset: %d\n", pbm.offset)
- fmt.Printf(" Size: %d bytes\n", pbm.size)
-
- // Read and decompress the primary data for this block
- if verbose {
- primaryData := make([]byte, pbm.size)
- fs.MustReadData(p.primary, int64(pbm.offset),
primaryData)
- decompressed, err := zstd.Decompress(nil, primaryData)
- if err == nil {
- fmt.Printf(" Primary Data (decompressed %d
bytes):\n", len(decompressed))
- printHexDump(decompressed, 4)
- }
- }
- }
- fmt.Printf("\n")
-
- // Read and display trace data
- fmt.Printf("Trace Data:\n")
-
fmt.Printf("================================================================================\n\n")
-
- decoder := &encoding.BytesBlockDecoder{}
+ defer ctx.close()
- rowNum := 0
- for _, pbm := range p.primaryBlockMetadata {
- // Read primary data block
- primaryData := make([]byte, pbm.size)
- fs.MustReadData(p.primary, int64(pbm.offset), primaryData)
-
- // Decompress
- decompressed, err := zstd.Decompress(nil, primaryData)
- if err != nil {
- fmt.Printf("Error decompressing primary data: %v\n",
err)
- continue
- }
-
- // Parse ALL block metadata entries from this primary block
- blockMetadatas, err := parseAllBlockMetadata(decompressed,
p.tagType)
- if err != nil {
- fmt.Printf("Error parsing block metadata: %v\n", err)
- continue
- }
-
- // Process each trace block within this primary block
- for _, bm := range blockMetadatas {
- // Read spans
- spans, spanIDs, err := readSpans(decoder, bm.spans,
int(bm.count), p.spans)
- if err != nil {
- fmt.Printf("Error reading spans for trace %s:
%v\n", bm.traceID, err)
- continue
- }
-
- // Read tags
- tags := make(map[string][][]byte)
- for tagName, tagBlock := range bm.tags {
- tagValues, err := readTagValues(decoder,
tagBlock, tagName, int(bm.count), p.tagMetadata[tagName], p.tags[tagName],
p.tagType[tagName])
- if err != nil {
- fmt.Printf("Error reading tag %s for
trace %s: %v\n", tagName, bm.traceID, err)
- continue
- }
- tags[tagName] = tagValues
- }
-
- // Display each span as a row
- for i := 0; i < len(spans); i++ {
- rowNum++
- fmt.Printf("Row %d:\n", rowNum)
- fmt.Printf(" TraceID: %s\n", bm.traceID)
- fmt.Printf(" SpanID: %s\n", spanIDs[i])
- fmt.Printf(" Span Data: %d bytes\n",
len(spans[i]))
- if verbose {
- fmt.Printf(" Span Content:\n")
- printHexDump(spans[i], 4)
- } else {
- // Try to print as string if it's
printable
- if isPrintable(spans[i]) {
- fmt.Printf(" Span: %s\n",
string(spans[i]))
- } else {
- fmt.Printf(" Span: (binary
data, %d bytes)\n", len(spans[i]))
- }
- }
-
- // Print tags for this span
- if len(tags) > 0 {
- fmt.Printf(" Tags:\n")
- tagNames := make([]string, 0, len(tags))
- for name := range tags {
- tagNames = append(tagNames,
name)
- }
- sort.Strings(tagNames)
- for _, name := range tagNames {
- if i < len(tags[name]) {
- tagValue :=
tags[name][i]
- if tagValue == nil {
- fmt.Printf("
%s: <nil>\n", name)
- } else {
- valueType :=
p.tagType[name]
- fmt.Printf("
%s (%s): %s\n", name, valueTypeName(valueType),
formatTagValueForDisplay(tagValue, valueType))
- }
- }
- }
- }
- fmt.Printf("\n")
- }
- }
+ if err := ctx.processParts(); err != nil {
+ return err
}
- fmt.Printf("Total rows: %d\n", rowNum)
+ ctx.printSummary()
return nil
}
-func dumpPartAsCSV(p *part) error {
- decoder := &encoding.BytesBlockDecoder{}
-
- // Collect all tag names in sorted order
- allTagNames := make([]string, 0, len(p.tagType))
- for name := range p.tagType {
- allTagNames = append(allTagNames, name)
- }
- sort.Strings(allTagNames)
-
- // Create CSV writer
- writer := csv.NewWriter(os.Stdout)
- defer writer.Flush()
-
- // Write header
- header := []string{"TraceID", "SpanID", "SpanDataSize"}
- header = append(header, allTagNames...)
- if err := writer.Write(header); err != nil {
- return fmt.Errorf("failed to write CSV header: %w", err)
- }
-
- // Process all blocks and write rows
- for _, pbm := range p.primaryBlockMetadata {
- // Read primary data block
- primaryData := make([]byte, pbm.size)
- fs.MustReadData(p.primary, int64(pbm.offset), primaryData)
-
- // Decompress
- decompressed, err := zstd.Decompress(nil, primaryData)
- if err != nil {
- fmt.Fprintf(os.Stderr, "Error decompressing primary
data: %v\n", err)
- continue
- }
-
- // Parse ALL block metadata entries from this primary block
- blockMetadatas, err := parseAllBlockMetadata(decompressed,
p.tagType)
- if err != nil {
- fmt.Fprintf(os.Stderr, "Error parsing block metadata:
%v\n", err)
- continue
- }
-
- // Process each block
- for _, bm := range blockMetadatas {
- // Read spans
- spans, spanIDs, err := readSpans(decoder, bm.spans,
int(bm.count), p.spans)
- if err != nil {
- fmt.Fprintf(os.Stderr, "Error reading spans for
trace %s: %v\n", bm.traceID, err)
- continue
- }
-
- // Read tags
- tags := make(map[string][][]byte)
- for tagName, tagBlock := range bm.tags {
- tagValues, err := readTagValues(decoder,
tagBlock, tagName, int(bm.count), p.tagMetadata[tagName], p.tags[tagName],
p.tagType[tagName])
- if err != nil {
- fmt.Fprintf(os.Stderr, "Error reading
tag %s for trace %s: %v\n", tagName, bm.traceID, err)
- continue
- }
- tags[tagName] = tagValues
- }
-
- // Write each span as a CSV row
- for i := 0; i < len(spans); i++ {
- row := append(make([]string, 0, len(header)),
bm.traceID, spanIDs[i], strconv.Itoa(len(spans[i])))
-
- // Add tag values in the same order as header
- for _, tagName := range allTagNames {
- var value string
- if i < len(tags[tagName]) &&
tags[tagName][i] != nil {
- valueType := p.tagType[tagName]
- value =
formatTagValueForCSV(tags[tagName][i], valueType)
- }
- row = append(row, value)
- }
-
- if err := writer.Write(row); err != nil {
- return fmt.Errorf("failed to write CSV
row: %w", err)
- }
- }
- }
- }
-
- return nil
-}
-
-func valueTypeName(vt pbv1.ValueType) string {
- switch vt {
- case pbv1.ValueTypeStr:
- return "STRING"
- case pbv1.ValueTypeInt64:
- return "INT64"
- case pbv1.ValueTypeFloat64:
- return "FLOAT64"
- case pbv1.ValueTypeStrArr:
- return "STRING_ARRAY"
- case pbv1.ValueTypeInt64Arr:
- return "INT64_ARRAY"
- case pbv1.ValueTypeBinaryData:
- return "BINARY_DATA"
- case pbv1.ValueTypeTimestamp:
- return "TIMESTAMP"
- case pbv1.ValueTypeUnknown:
- return "UNKNOWN"
- default:
- return fmt.Sprintf("UNKNOWN(%d)", vt)
- }
-}
-
func formatTimestamp(nanos int64) string {
if nanos == 0 {
return "N/A"
@@ -400,61 +182,6 @@ func formatTagValueForDisplay(data []byte, vt
pbv1.ValueType) string {
}
}
-func formatTagValueForCSV(data []byte, vt pbv1.ValueType) string {
- if data == nil {
- return ""
- }
- switch vt {
- case pbv1.ValueTypeStr:
- return string(data)
- case pbv1.ValueTypeInt64:
- if len(data) >= 8 {
- return strconv.FormatInt(convert.BytesToInt64(data), 10)
- }
- return ""
- case pbv1.ValueTypeFloat64:
- if len(data) >= 8 {
- return
strconv.FormatFloat(convert.BytesToFloat64(data), 'f', -1, 64)
- }
- return ""
- case pbv1.ValueTypeTimestamp:
- if len(data) >= 8 {
- nanos := convert.BytesToInt64(data)
- return formatTimestamp(nanos)
- }
- return ""
- case pbv1.ValueTypeStrArr:
- // Decode string array - each element is separated by
EntityDelimiter
- var values []string
- var err error
- remaining := data
- for len(remaining) > 0 {
- var decoded []byte
- decoded, remaining, err = unmarshalVarArray(nil,
remaining)
- if err != nil {
- break
- }
- if len(decoded) > 0 {
- values = append(values, string(decoded))
- }
- }
- if len(values) > 0 {
- return strings.Join(values, ";")
- }
- return ""
- case pbv1.ValueTypeBinaryData:
- if isPrintable(data) {
- return string(data)
- }
- return fmt.Sprintf("(binary: %d bytes)", len(data))
- default:
- if isPrintable(data) {
- return string(data)
- }
- return fmt.Sprintf("(binary: %d bytes)", len(data))
- }
-}
-
func unmarshalVarArray(dest, src []byte) ([]byte, []byte, error) {
if len(src) == 0 {
return nil, nil, fmt.Errorf("empty entity value")
@@ -561,6 +288,243 @@ type part struct {
partMetadata partMetadata
}
+type traceRowData struct {
+ tags map[string][]byte
+ traceID string
+ spanID string
+ spanData []byte
+ partID uint64
+ seriesID common.SeriesID
+}
+
+type traceDumpContext struct {
+ tagFilter logical.TagFilter
+ fileSystem fs.FileSystem
+ seriesMap map[common.SeriesID]string
+ writer *csv.Writer
+ opts traceDumpOptions
+ partIDs []uint64
+ projectionTags []string
+ tagColumns []string
+ rowNum int
+}
+
+func newTraceDumpContext(opts traceDumpOptions) (*traceDumpContext, error) {
+ ctx := &traceDumpContext{
+ opts: opts,
+ fileSystem: fs.NewLocalFileSystem(),
+ }
+
+ partIDs, err := discoverTracePartIDs(opts.shardPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to discover part IDs: %w", err)
+ }
+ if len(partIDs) == 0 {
+ fmt.Println("No parts found in shard directory")
+ return nil, nil
+ }
+ ctx.partIDs = partIDs
+ fmt.Fprintf(os.Stderr, "Found %d parts in shard\n", len(partIDs))
+
+ ctx.seriesMap, err = loadTraceSeriesMap(opts.segmentPath)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Failed to load series
information: %v\n", err)
+ ctx.seriesMap = nil
+ } else {
+ fmt.Fprintf(os.Stderr, "Loaded %d series from segment\n",
len(ctx.seriesMap))
+ }
+
+ if opts.criteriaJSON != "" {
+ var criteria *modelv1.Criteria
+ criteria, err = parseTraceCriteriaJSON(opts.criteriaJSON)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse criteria: %w",
err)
+ }
+ ctx.tagFilter, err = logical.BuildSimpleTagFilter(criteria)
+ if err != nil {
+ return nil, fmt.Errorf("failed to build tag filter:
%w", err)
+ }
+ fmt.Fprintf(os.Stderr, "Applied criteria filter\n")
+ }
+
+ if opts.projectionTags != "" {
+ ctx.projectionTags =
parseTraceProjectionTags(opts.projectionTags)
+ fmt.Fprintf(os.Stderr, "Projection tags: %v\n",
ctx.projectionTags)
+ }
+
+ if opts.csvOutput {
+ if len(ctx.projectionTags) > 0 {
+ ctx.tagColumns = ctx.projectionTags
+ } else {
+ ctx.tagColumns, err = discoverTagColumns(ctx.partIDs,
opts.shardPath, ctx.fileSystem)
+ if err != nil {
+ return nil, fmt.Errorf("failed to discover tag
columns: %w", err)
+ }
+ }
+ }
+
+ if err := ctx.initOutput(); err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
+}
+
+func (ctx *traceDumpContext) initOutput() error {
+ if !ctx.opts.csvOutput {
+
fmt.Printf("================================================================================\n")
+ fmt.Fprintf(os.Stderr, "Processing parts...\n")
+ return nil
+ }
+
+ ctx.writer = csv.NewWriter(os.Stdout)
+ header := []string{"PartID", "TraceID", "SpanID", "SeriesID", "Series",
"SpanDataSize"}
+ header = append(header, ctx.tagColumns...)
+ if err := ctx.writer.Write(header); err != nil {
+ return fmt.Errorf("failed to write CSV header: %w", err)
+ }
+ return nil
+}
+
+func (ctx *traceDumpContext) close() {
+ if ctx.writer != nil {
+ ctx.writer.Flush()
+ }
+}
+
+func (ctx *traceDumpContext) processParts() error {
+ for partIdx, partID := range ctx.partIDs {
+ fmt.Fprintf(os.Stderr, "Processing part %d/%d (0x%016x)...\n",
partIdx+1, len(ctx.partIDs), partID)
+
+ p, err := openFilePart(partID, ctx.opts.shardPath,
ctx.fileSystem)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: failed to open part
%016x: %v\n", partID, err)
+ continue
+ }
+
+ partRowCount, partErr := ctx.processPart(partID, p)
+ closePart(p)
+ if partErr != nil {
+ return partErr
+ }
+
+ fmt.Fprintf(os.Stderr, " Part %d/%d: processed %d rows (total:
%d)\n", partIdx+1, len(ctx.partIDs), partRowCount, ctx.rowNum)
+ }
+ return nil
+}
+
+func (ctx *traceDumpContext) processPart(partID uint64, p *part) (int, error) {
+ decoder := &encoding.BytesBlockDecoder{}
+ partRowCount := 0
+
+ for _, pbm := range p.primaryBlockMetadata {
+ primaryData := make([]byte, pbm.size)
+ fs.MustReadData(p.primary, int64(pbm.offset), primaryData)
+
+ decompressed, err := zstd.Decompress(nil, primaryData)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error decompressing
primary data in part %016x: %v\n", partID, err)
+ continue
+ }
+
+ blockMetadatas, err := parseAllBlockMetadata(decompressed,
p.tagType)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error parsing block
metadata in part %016x: %v\n", partID, err)
+ continue
+ }
+
+ for _, bm := range blockMetadatas {
+ rows, err := ctx.processBlock(partID, bm, p, decoder)
+ if err != nil {
+ return partRowCount, err
+ }
+ partRowCount += rows
+ }
+ }
+
+ return partRowCount, nil
+}
+
+func (ctx *traceDumpContext) processBlock(partID uint64, bm *blockMetadata, p
*part, decoder *encoding.BytesBlockDecoder) (int, error) {
+ spans, spanIDs, err := readSpans(decoder, bm.spans, int(bm.count),
p.spans)
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error reading spans for trace
%s in part %016x: %v\n", bm.traceID, partID, err)
+ return 0, nil
+ }
+
+ tagsBySpan := ctx.readBlockTags(partID, bm, p, decoder)
+ rows := 0
+ for i := 0; i < len(spans); i++ {
+ spanTags := make(map[string][]byte)
+ for tagName, tagValues := range tagsBySpan {
+ if i < len(tagValues) {
+ spanTags[tagName] = tagValues[i]
+ }
+ }
+
+ if ctx.shouldSkip(spanTags, p.tagType) {
+ continue
+ }
+
+ row := traceRowData{
+ partID: partID,
+ traceID: bm.traceID,
+ spanID: spanIDs[i],
+ spanData: spans[i],
+ tags: spanTags,
+ seriesID: calculateSeriesIDFromTags(spanTags),
+ }
+
+ if err := ctx.writeRow(row); err != nil {
+ return rows, err
+ }
+
+ rows++
+ }
+
+ return rows, nil
+}
+
+func (ctx *traceDumpContext) readBlockTags(partID uint64, bm *blockMetadata, p
*part, decoder *encoding.BytesBlockDecoder) map[string][][]byte {
+ tags := make(map[string][][]byte)
+ for tagName, tagBlock := range bm.tags {
+ tagValues, err := readTagValues(decoder, tagBlock, tagName,
int(bm.count), p.tagMetadata[tagName], p.tags[tagName], p.tagType[tagName])
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "Warning: Error reading tag %s
for trace %s in part %016x: %v\n", tagName, bm.traceID, partID, err)
+ continue
+ }
+ tags[tagName] = tagValues
+ }
+ return tags
+}
+
+func (ctx *traceDumpContext) shouldSkip(tags map[string][]byte, tagTypes
map[string]pbv1.ValueType) bool {
+ if ctx.tagFilter == nil || ctx.tagFilter == logical.DummyFilter {
+ return false
+ }
+ return !matchesCriteria(tags, tagTypes, ctx.tagFilter)
+}
+
+func (ctx *traceDumpContext) writeRow(row traceRowData) error {
+ if ctx.opts.csvOutput {
+ if err := writeTraceRowAsCSV(ctx.writer, row, ctx.tagColumns,
ctx.seriesMap); err != nil {
+ return err
+ }
+ } else {
+ writeTraceRowAsText(row, ctx.rowNum+1, ctx.opts.verbose,
ctx.projectionTags, ctx.seriesMap)
+ }
+ ctx.rowNum++
+ return nil
+}
+
+func (ctx *traceDumpContext) printSummary() {
+ if ctx.opts.csvOutput {
+ fmt.Fprintf(os.Stderr, "Total rows written: %d\n", ctx.rowNum)
+ return
+ }
+ fmt.Printf("\nTotal rows: %d\n", ctx.rowNum)
+}
+
func openFilePart(id uint64, root string, fileSystem fs.FileSystem) (*part,
error) {
var p part
partPath := filepath.Join(root, fmt.Sprintf("%016x", id))
@@ -877,3 +841,459 @@ func unmarshalTagMetadata(tm *tagMetadata, src []byte)
error {
return nil
}
+
+// Helper functions for new shard-level dump.
+
+func discoverTracePartIDs(shardPath string) ([]uint64, error) {
+ entries, err := os.ReadDir(shardPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read shard directory: %w",
err)
+ }
+
+ var partIDs []uint64
+ for _, entry := range entries {
+ if !entry.IsDir() {
+ continue
+ }
+ // Skip special directories
+ name := entry.Name()
+ if name == "sidx" || name == "meta" {
+ continue
+ }
+ // Try to parse as hex part ID
+ partID, err := strconv.ParseUint(name, 16, 64)
+ if err == nil {
+ partIDs = append(partIDs, partID)
+ }
+ }
+
+ sort.Slice(partIDs, func(i, j int) bool {
+ return partIDs[i] < partIDs[j]
+ })
+
+ return partIDs, nil
+}
+
+func loadTraceSeriesMap(segmentPath string) (map[common.SeriesID]string,
error) {
+ seriesIndexPath := filepath.Join(segmentPath, "sidx")
+
+ l := logger.GetLogger("dump-trace")
+
+ // Create inverted index store
+ store, err := inverted.NewStore(inverted.StoreOpts{
+ Path: seriesIndexPath,
+ Logger: l,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("failed to open series index: %w", err)
+ }
+ defer store.Close()
+
+ // Get series iterator
+ ctx := context.Background()
+ iter, err := store.SeriesIterator(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create series iterator: %w",
err)
+ }
+ defer iter.Close()
+
+ // Build map of SeriesID -> text representation
+ seriesMap := make(map[common.SeriesID]string)
+ for iter.Next() {
+ series := iter.Val()
+ if len(series.EntityValues) > 0 {
+ seriesID :=
common.SeriesID(convert.Hash(series.EntityValues))
+ // Convert EntityValues bytes to readable string
+ seriesText := string(series.EntityValues)
+ seriesMap[seriesID] = seriesText
+ }
+ }
+
+ return seriesMap, nil
+}
+
+func parseTraceCriteriaJSON(criteriaJSON string) (*modelv1.Criteria, error) {
+ criteria := &modelv1.Criteria{}
+ err := protojson.Unmarshal([]byte(criteriaJSON), criteria)
+ if err != nil {
+ return nil, fmt.Errorf("invalid criteria JSON: %w", err)
+ }
+ return criteria, nil
+}
+
+func parseTraceProjectionTags(projectionStr string) []string {
+ if projectionStr == "" {
+ return nil
+ }
+
+ tags := strings.Split(projectionStr, ",")
+ result := make([]string, 0, len(tags))
+ for _, tag := range tags {
+ tag = strings.TrimSpace(tag)
+ if tag != "" {
+ result = append(result, tag)
+ }
+ }
+ return result
+}
+
+func calculateSeriesIDFromTags(tags map[string][]byte) common.SeriesID {
+ var entityValues []byte
+
+ // Sort tag names for consistent hashing
+ tagNames := make([]string, 0, len(tags))
+ for name := range tags {
+ tagNames = append(tagNames, name)
+ }
+ sort.Strings(tagNames)
+
+ // Build entity values
+ for _, name := range tagNames {
+ value := tags[name]
+ if value != nil {
+ // Append tag name=value
+ entityValues = append(entityValues, []byte(name)...)
+ entityValues = append(entityValues, '=')
+ entityValues = append(entityValues, value...)
+ entityValues = append(entityValues,
internalencoding.EntityDelimiter)
+ }
+ }
+
+ if len(entityValues) == 0 {
+ return 0
+ }
+
+ return common.SeriesID(convert.Hash(entityValues))
+}
+
+func matchesCriteria(tags map[string][]byte, tagTypes
map[string]pbv1.ValueType, filter logical.TagFilter) bool {
+ // Convert tags to modelv1.Tag format
+ modelTags := make([]*modelv1.Tag, 0, len(tags))
+ for name, value := range tags {
+ if value == nil {
+ continue
+ }
+
+ valueType := tagTypes[name]
+ tagValue := convertTagValue(value, valueType)
+ if tagValue != nil {
+ modelTags = append(modelTags, &modelv1.Tag{
+ Key: name,
+ Value: tagValue,
+ })
+ }
+ }
+
+ // Use TagFilterMatcher to check if tags match the filter
+ // Create a simple registry for the available tags
+ registry := &traceTagRegistry{
+ tagTypes: tagTypes,
+ }
+
+ matcher := logical.NewTagFilterMatcher(filter, registry,
traceTagValueDecoder)
+ match, _ := matcher.Match(modelTags)
+ return match
+}
+
+func convertTagValue(value []byte, valueType pbv1.ValueType) *modelv1.TagValue
{
+ if value == nil {
+ return pbv1.NullTagValue
+ }
+
+ switch valueType {
+ case pbv1.ValueTypeStr:
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: string(value),
+ },
+ },
+ }
+ case pbv1.ValueTypeInt64:
+ if len(value) >= 8 {
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Int{
+ Int: &modelv1.Int{
+ Value:
convert.BytesToInt64(value),
+ },
+ },
+ }
+ }
+ case pbv1.ValueTypeStrArr:
+ // Decode string array
+ var values []string
+ var err error
+ remaining := value
+ for len(remaining) > 0 {
+ var decoded []byte
+ decoded, remaining, err = unmarshalVarArray(nil,
remaining)
+ if err != nil {
+ break
+ }
+ if len(decoded) > 0 {
+ values = append(values, string(decoded))
+ }
+ }
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_StrArray{
+ StrArray: &modelv1.StrArray{
+ Value: values,
+ },
+ },
+ }
+ case pbv1.ValueTypeBinaryData:
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_BinaryData{
+ BinaryData: value,
+ },
+ }
+ case pbv1.ValueTypeUnknown:
+ // Fall through to default
+ case pbv1.ValueTypeFloat64:
+ // Fall through to default
+ case pbv1.ValueTypeInt64Arr:
+ // Fall through to default
+ case pbv1.ValueTypeTimestamp:
+ // Fall through to default
+ }
+
+ // Default: try to return as string
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: string(value),
+ },
+ },
+ }
+}
+
+// traceTagRegistry implements logical.Schema for tag filtering.
+type traceTagRegistry struct {
+ tagTypes map[string]pbv1.ValueType
+}
+
+func (r *traceTagRegistry) FindTagSpecByName(name string) *logical.TagSpec {
+ valueType := r.tagTypes[name]
+ var tagType databasev1.TagType
+ switch valueType {
+ case pbv1.ValueTypeStr:
+ tagType = databasev1.TagType_TAG_TYPE_STRING
+ case pbv1.ValueTypeInt64:
+ tagType = databasev1.TagType_TAG_TYPE_INT
+ case pbv1.ValueTypeStrArr:
+ tagType = databasev1.TagType_TAG_TYPE_STRING_ARRAY
+ default:
+ tagType = databasev1.TagType_TAG_TYPE_STRING
+ }
+
+ return &logical.TagSpec{
+ Spec: &databasev1.TagSpec{
+ Name: name,
+ Type: tagType,
+ },
+ TagFamilyIdx: 0,
+ TagIdx: 0,
+ }
+}
+
+func (r *traceTagRegistry) IndexDefined(_ string) (bool,
*databasev1.IndexRule) {
+ return false, nil
+}
+
+func (r *traceTagRegistry) IndexRuleDefined(_ string) (bool,
*databasev1.IndexRule) {
+ return false, nil
+}
+
+func (r *traceTagRegistry) EntityList() []string {
+ return nil
+}
+
+func (r *traceTagRegistry) CreateTagRef(_ ...[]*logical.Tag)
([][]*logical.TagRef, error) {
+ return nil, fmt.Errorf("CreateTagRef not supported in dump tool")
+}
+
+func (r *traceTagRegistry) CreateFieldRef(_ ...*logical.Field)
([]*logical.FieldRef, error) {
+ return nil, fmt.Errorf("CreateFieldRef not supported in dump tool")
+}
+
+func (r *traceTagRegistry) ProjTags(_ ...[]*logical.TagRef) logical.Schema {
+ return r
+}
+
+func (r *traceTagRegistry) ProjFields(_ ...*logical.FieldRef) logical.Schema {
+ return r
+}
+
+func (r *traceTagRegistry) Children() []logical.Schema {
+ return nil
+}
+
+func traceTagValueDecoder(valueType pbv1.ValueType, value []byte, valueArr
[][]byte) *modelv1.TagValue {
+ if value == nil && valueArr == nil {
+ return pbv1.NullTagValue
+ }
+
+ switch valueType {
+ case pbv1.ValueTypeStr:
+ if value == nil {
+ return pbv1.NullTagValue
+ }
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: string(value),
+ },
+ },
+ }
+ case pbv1.ValueTypeInt64:
+ if value == nil {
+ return pbv1.NullTagValue
+ }
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Int{
+ Int: &modelv1.Int{
+ Value: convert.BytesToInt64(value),
+ },
+ },
+ }
+ case pbv1.ValueTypeStrArr:
+ var values []string
+ for _, v := range valueArr {
+ values = append(values, string(v))
+ }
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_StrArray{
+ StrArray: &modelv1.StrArray{
+ Value: values,
+ },
+ },
+ }
+ case pbv1.ValueTypeBinaryData:
+ if value == nil {
+ return pbv1.NullTagValue
+ }
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_BinaryData{
+ BinaryData: value,
+ },
+ }
+ default:
+ if value != nil {
+ return &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: string(value),
+ },
+ },
+ }
+ }
+ return pbv1.NullTagValue
+ }
+}
+
+func discoverTagColumns(partIDs []uint64, shardPath string, fileSystem
fs.FileSystem) ([]string, error) {
+ if len(partIDs) == 0 {
+ return nil, nil
+ }
+
+ // Open first part to discover tag columns
+ p, err := openFilePart(partIDs[0], shardPath, fileSystem)
+ if err != nil {
+ return nil, fmt.Errorf("failed to open first part: %w", err)
+ }
+ defer closePart(p)
+
+ // Collect all tag names
+ tagNames := make([]string, 0, len(p.tagType))
+ for name := range p.tagType {
+ tagNames = append(tagNames, name)
+ }
+ sort.Strings(tagNames)
+
+ return tagNames, nil
+}
+
+func writeTraceRowAsText(row traceRowData, rowNum int, verbose bool,
projectionTags []string, seriesMap map[common.SeriesID]string) {
+ fmt.Printf("Row %d:\n", rowNum)
+ fmt.Printf(" PartID: %d (0x%016x)\n", row.partID, row.partID)
+ fmt.Printf(" TraceID: %s\n", row.traceID)
+ fmt.Printf(" SpanID: %s\n", row.spanID)
+ fmt.Printf(" SeriesID: %d\n", row.seriesID)
+
+ // Add series text if available
+ if seriesMap != nil {
+ if seriesText, ok := seriesMap[row.seriesID]; ok {
+ fmt.Printf(" Series: %s\n", seriesText)
+ }
+ }
+
+ fmt.Printf(" Span Data: %d bytes\n", len(row.spanData))
+ if verbose {
+ fmt.Printf(" Span Content:\n")
+ printHexDump(row.spanData, 4)
+ } else {
+ if isPrintable(row.spanData) {
+ fmt.Printf(" Span: %s\n", string(row.spanData))
+ } else {
+ fmt.Printf(" Span: (binary data, %d bytes)\n",
len(row.spanData))
+ }
+ }
+
+ // Print projected tags or all tags
+ if len(row.tags) > 0 {
+ fmt.Printf(" Tags:\n")
+
+ var tagsToShow []string
+ if len(projectionTags) > 0 {
+ tagsToShow = projectionTags
+ } else {
+ // Show all tags
+ for name := range row.tags {
+ tagsToShow = append(tagsToShow, name)
+ }
+ sort.Strings(tagsToShow)
+ }
+
+ for _, name := range tagsToShow {
+ value, exists := row.tags[name]
+ if !exists {
+ continue
+ }
+ if value == nil {
+ fmt.Printf(" %s: <nil>\n", name)
+ } else {
+ fmt.Printf(" %s: %s\n", name,
formatTagValueForDisplay(value, pbv1.ValueTypeStr))
+ }
+ }
+ }
+ fmt.Printf("\n")
+}
+
+func writeTraceRowAsCSV(writer *csv.Writer, row traceRowData, tagColumns
[]string, seriesMap map[common.SeriesID]string) error {
+ seriesText := ""
+ if seriesMap != nil {
+ if text, ok := seriesMap[row.seriesID]; ok {
+ seriesText = text
+ }
+ }
+
+ csvRow := []string{
+ fmt.Sprintf("%d", row.partID),
+ row.traceID,
+ row.spanID,
+ fmt.Sprintf("%d", row.seriesID),
+ seriesText,
+ strconv.Itoa(len(row.spanData)),
+ }
+
+ // Add tag values
+ for _, tagName := range tagColumns {
+ value := ""
+ if tagValue, exists := row.tags[tagName]; exists && tagValue !=
nil {
+ value = string(tagValue)
+ }
+ csvRow = append(csvRow, value)
+ }
+
+ return writer.Write(csvRow)
+}
diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index 80f59d34..d96b78c2 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -87,10 +87,11 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
resp = bus.NewMessage(now, common.NewError("no stage found"))
return
}
+ var span *pkgquery.Span
if request.Trace {
var tracer *pkgquery.Tracer
tracer, ctx = pkgquery.NewTracer(ctx,
n.Format(time.RFC3339Nano))
- span, _ := tracer.StartSpan(ctx, "distributed-client")
+ span, _ = tracer.StartSpan(ctx, "distributed-client")
span.Tag("request",
convert.BytesToString(logger.Proto(request)))
span.Tagf("nodeSelectors", "%v", nodeSelectors)
defer func() {
@@ -118,6 +119,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
aggregator := query.CreateTopNPostAggregator(request.GetTopN(),
agg, request.GetFieldValueSort())
var tags []string
+ var responseCount int
for _, f := range ff {
if m, getErr := f.Get(); getErr != nil {
allErr = multierr.Append(allErr, getErr)
@@ -126,6 +128,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
if d == nil {
continue
}
+ responseCount++
topNResp := d.(*measurev1.TopNResponse)
for _, l := range topNResp.Lists {
for _, tn := range l.Items {
@@ -144,6 +147,9 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
}
}
}
+ if span != nil {
+ span.Tagf("response_count", "%d", responseCount)
+ }
if allErr != nil {
resp = bus.NewMessage(now, common.NewError("execute the query
%s: %v", request.GetName(), allErr))
return
@@ -153,6 +159,9 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
return
}
lists := aggregator.Val(tags)
+ if span != nil {
+ span.Tagf("list_count", "%d", len(lists))
+ }
resp = bus.NewMessage(now, &measurev1.TopNResponse{
Lists: lists,
})
diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
index e136524e..21c8f7c9 100644
--- a/banyand/internal/sidx/block.go
+++ b/banyand/internal/sidx/block.go
@@ -120,7 +120,11 @@ func (b *block) processTag(tagName string, elementTags
[][]*tag) {
if tag.name == tagName {
// Store structured tagRow instead of marshaled
bytes
if tag.valueArr != nil {
- td.values[i].valueArr = tag.valueArr
+ td.values[i].valueArr = make([][]byte,
len(tag.valueArr))
+ for j, v := range tag.valueArr {
+ td.values[i].valueArr[j] =
make([]byte, len(v))
+ copy(td.values[i].valueArr[j],
v)
+ }
} else {
td.values[i].value = tag.value
}
diff --git a/banyand/internal/sidx/interfaces.go
b/banyand/internal/sidx/interfaces.go
index c819b1e3..670f76f1 100644
--- a/banyand/internal/sidx/interfaces.go
+++ b/banyand/internal/sidx/interfaces.go
@@ -48,6 +48,11 @@ type SIDX interface {
// The returned QueryResponse channel contains ordered batches limited
by req.MaxBatchSize
// unique Data elements (when positive). The error channel delivers any
fatal execution error.
StreamingQuery(ctx context.Context, req QueryRequest) (<-chan
*QueryResponse, <-chan error)
+ // ScanQuery executes a synchronous full scan query without requiring
series IDs.
+ // It scans all blocks in parts sequentially and applies filters to
each row.
+ // Returns a slice of QueryResponse objects containing all matching
results.
+ // This is a synchronous operation suitable for dump/debug tools.
+ ScanQuery(ctx context.Context, req ScanQueryRequest) ([]*QueryResponse,
error)
// Stats returns current system statistics and performance metrics.
Stats(ctx context.Context) (*Stats, error)
// Close gracefully shuts down the SIDX instance, ensuring all data is
persisted.
@@ -88,6 +93,26 @@ type QueryRequest struct {
MaxBatchSize int
}
+// ScanProgressFunc is a callback for reporting scan progress.
+// It receives the current part number (1-based), total parts, and rows found
so far.
+type ScanProgressFunc func(currentPart, totalParts int, rowsFound int)
+
+// ScanQueryRequest specifies parameters for a full-scan query operation.
+// Unlike QueryRequest, this does not require SeriesIDs and does not support
Filter
+// (all blocks are scanned, none are skipped).
+//
+//nolint:govet // struct layout optimized for readability; 64 bytes is
acceptable
+type ScanQueryRequest struct {
+ TagFilter model.TagFilterMatcher
+ TagProjection []model.TagProjection
+ OnProgress ScanProgressFunc
+ MinKey *int64
+ MaxKey *int64
+ MaxBatchSize int
+ // OnProgress is an optional callback for progress reporting during
scan.
+ // Called after processing each part with the current progress.
+}
+
// QueryResponse contains a batch of query results and execution metadata.
// This follows BanyanDB result patterns with parallel arrays for efficiency.
// Uses individual tag-based strategy (like trace module) rather than
tag-family approach (like stream module).
@@ -97,6 +122,7 @@ type QueryResponse struct {
Data [][]byte
SIDs []common.SeriesID
PartIDs []uint64
+ Tags map[string][]string // Projected tags: tag name -> values for
each row
Metadata ResponseMetadata
}
@@ -311,6 +337,17 @@ func (qr QueryRequest) Validate() error {
return nil
}
+// Validate validates a ScanQueryRequest for correctness.
+func (sqr ScanQueryRequest) Validate() error {
+ if sqr.MaxBatchSize < 0 {
+ return fmt.Errorf("maxBatchSize cannot be negative")
+ }
+ if sqr.MinKey != nil && sqr.MaxKey != nil && *sqr.MinKey > *sqr.MaxKey {
+ return fmt.Errorf("MinKey cannot be greater than MaxKey")
+ }
+ return nil
+}
+
// Reset resets the QueryRequest to its zero state.
func (qr *QueryRequest) Reset() {
qr.SeriesIDs = nil
diff --git a/banyand/internal/sidx/merge_test.go
b/banyand/internal/sidx/merge_test.go
index 0d63b5f4..adb52062 100644
--- a/banyand/internal/sidx/merge_test.go
+++ b/banyand/internal/sidx/merge_test.go
@@ -21,6 +21,7 @@ import (
"errors"
"path/filepath"
"reflect"
+ "sort"
"testing"
"github.com/google/go-cmp/cmp"
@@ -281,6 +282,36 @@ var (
})
return es
}()
+
+ esStrArr1 = func() *elements {
+ es := generateElements()
+ es.mustAppend(1, 100, make([]byte, 100), []Tag{
+ {Name: "arrTag", ValueArr: [][]byte{[]byte("value1"),
[]byte("value2")}, ValueType: pbv1.ValueTypeStrArr},
+ })
+ es.mustAppend(2, 200, make([]byte, 100), []Tag{
+ {Name: "arrTag", ValueArr: [][]byte{[]byte("value3"),
[]byte("value4")}, ValueType: pbv1.ValueTypeStrArr},
+ })
+ return es
+ }()
+
+ esStrArr2 = func() *elements {
+ es := generateElements()
+ es.mustAppend(1, 150, make([]byte, 100), []Tag{
+ {Name: "arrTag", ValueArr: [][]byte{[]byte("value5"),
[]byte("value6")}, ValueType: pbv1.ValueTypeStrArr},
+ })
+ es.mustAppend(2, 250, make([]byte, 100), []Tag{
+ {Name: "arrTag", ValueArr: [][]byte{[]byte("value7"),
[]byte("value8")}, ValueType: pbv1.ValueTypeStrArr},
+ })
+ return es
+ }()
+
+ esStrArrWithEmpty = func() *elements {
+ es := generateElements()
+ es.mustAppend(1, 300, make([]byte, 100), []Tag{
+ {Name: "arrTag", ValueArr: [][]byte{[]byte("a"),
[]byte(""), []byte("b")}, ValueType: pbv1.ValueTypeStrArr},
+ })
+ return es
+ }()
)
func Test_mergeParts(t *testing.T) {
@@ -331,10 +362,27 @@ func Test_mergeParts(t *testing.T) {
{seriesID: 4, count: 1, uncompressedSize: 77},
},
},
+ {
+ name: "Test with string array value type",
+ esList: []*elements{esStrArr1, esStrArr2},
+ want: []blockMetadata{
+ {seriesID: 1, count: 2, uncompressedSize: 264},
+ {seriesID: 2, count: 2, uncompressedSize: 264},
+ },
+ },
+ {
+ name: "Test with string array containing empty
strings",
+ esList: []*elements{esStrArrWithEmpty},
+ want: []blockMetadata{
+ {seriesID: 1, count: 1, uncompressedSize: 128},
+ },
+ },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ wantBlocks := elementsToBlocks(tt.esList)
+
verify := func(t *testing.T, pp []*partWrapper,
fileSystem fs.FileSystem, root string, partID uint64) {
closeCh := make(chan struct{})
defer close(closeCh)
@@ -356,8 +404,14 @@ func Test_mergeParts(t *testing.T) {
reader := &blockReader{}
reader.init([]*partMergeIter{pmi})
var got []blockMetadata
+ var gotBlocks []block
+ decoder := generateTagValuesDecoder()
+ defer releaseTagValuesDecoder(decoder)
+
for reader.nextBlockMetadata() {
got = append(got, reader.block.bm)
+ reader.loadBlockData(decoder)
+ gotBlocks = append(gotBlocks,
deepCopyBlock(&reader.block.block))
}
require.NoError(t, reader.error())
@@ -373,6 +427,13 @@ func Test_mergeParts(t *testing.T) {
); diff != "" {
t.Errorf("Unexpected blockMetadata
(-got +want):\n%s", diff)
}
+
+ if diff := cmp.Diff(gotBlocks, wantBlocks,
+ cmpopts.IgnoreFields(tagData{},
"uniqueValues", "tmpBytes"),
+ cmp.AllowUnexported(block{}, tagData{},
tagRow{}),
+ ); diff != "" {
+ t.Errorf("Unexpected blocks (-got
+want):\n%s", diff)
+ }
}
t.Run("memory parts", func(t *testing.T) {
@@ -417,3 +478,91 @@ func Test_mergeParts(t *testing.T) {
})
}
}
+
+func elementsToBlocks(esList []*elements) []block {
+ merged := generateElements()
+ defer releaseElements(merged)
+
+ for _, es := range esList {
+ for i := 0; i < len(es.seriesIDs); i++ {
+ var tags []Tag
+ for _, t := range es.tags[i] {
+ tags = append(tags, Tag{
+ Name: t.name,
+ Value: t.value,
+ ValueArr: t.valueArr,
+ ValueType: t.valueType,
+ })
+ }
+ merged.mustAppend(es.seriesIDs[i], es.userKeys[i],
es.data[i], tags)
+ }
+ }
+
+ sort.Sort(merged)
+
+ var blocks []block
+ if merged.Len() == 0 {
+ return blocks
+ }
+
+ start := 0
+ for i := 1; i <= merged.Len(); i++ {
+ if i == merged.Len() || merged.seriesIDs[i] !=
merged.seriesIDs[start] {
+ b := block{
+ tags: make(map[string]*tagData),
+ userKeys: make([]int64, i-start),
+ data: make([][]byte, i-start),
+ }
+ copy(b.userKeys, merged.userKeys[start:i])
+ for k := 0; k < i-start; k++ {
+ b.data[k] = merged.data[start+k]
+ }
+ (&b).mustInitFromTags(merged.tags[start:i])
+ blocks = append(blocks, b)
+ start = i
+ }
+ }
+ return blocks
+}
+
+func deepCopyBlock(b *block) block {
+ newB := block{
+ tags: make(map[string]*tagData),
+ }
+ newB.userKeys = append([]int64(nil), b.userKeys...)
+ newB.data = make([][]byte, len(b.data))
+ for i, d := range b.data {
+ newB.data[i] = cloneBytes(d)
+ }
+ for k, v := range b.tags {
+ newTd := &tagData{
+ name: v.name,
+ valueType: v.valueType,
+ values: make([]tagRow, len(v.values)),
+ }
+ for i, row := range v.values {
+ newRow := tagRow{}
+ if row.value != nil {
+ newRow.value = cloneBytes(row.value)
+ }
+ if row.valueArr != nil {
+ newRow.valueArr = make([][]byte,
len(row.valueArr))
+ for j, arrVal := range row.valueArr {
+ newRow.valueArr[j] = cloneBytes(arrVal)
+ }
+ }
+ newTd.values[i] = newRow
+ }
+ newB.tags[k] = newTd
+ }
+ return newB
+}
+
+func cloneBytes(src []byte) []byte {
+ if src == nil {
+ return nil
+ }
+ dst := make([]byte, len(src))
+ copy(dst, src)
+ return dst
+}
diff --git a/banyand/internal/sidx/part_iter.go
b/banyand/internal/sidx/part_iter.go
index 39f3f47c..eb530e32 100644
--- a/banyand/internal/sidx/part_iter.go
+++ b/banyand/internal/sidx/part_iter.go
@@ -21,15 +21,11 @@ import (
"errors"
"fmt"
"io"
- "sort"
- "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
- "github.com/apache/skywalking-banyandb/pkg/index"
- "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/pool"
)
@@ -37,13 +33,10 @@ type partIter struct {
err error
p *part
curBlock *blockMetadata
- sids []common.SeriesID
- blockFilter index.Filter
primaryBlockMetadata []primaryBlockMetadata
bms []blockMetadata
compressedPrimaryBuf []byte
primaryBuf []byte
- sidIdx int
minKey int64
maxKey int64
}
@@ -51,9 +44,6 @@ type partIter struct {
func (pi *partIter) reset() {
pi.curBlock = nil
pi.p = nil
- pi.sids = nil
- pi.blockFilter = nil
- pi.sidIdx = 0
pi.primaryBlockMetadata = nil
pi.bms = nil
pi.compressedPrimaryBuf = pi.compressedPrimaryBuf[:0]
@@ -61,20 +51,14 @@ func (pi *partIter) reset() {
pi.err = nil
}
-func (pi *partIter) init(bma *blockMetadataArray, p *part, sids
[]common.SeriesID, minKey, maxKey int64, blockFilter index.Filter) {
+func (pi *partIter) init(bma *blockMetadataArray, p *part, minKey, maxKey
int64) {
pi.reset()
pi.curBlock = &blockMetadata{}
pi.p = p
-
pi.bms = bma.arr
- pi.sids = sids
- pi.blockFilter = blockFilter
pi.minKey = minKey
pi.maxKey = maxKey
-
pi.primaryBlockMetadata = p.primaryBlockMetadata
-
- pi.nextSeriesID()
}
func (pi *partIter) nextBlock() bool {
@@ -100,52 +84,13 @@ func (pi *partIter) error() error {
return pi.err
}
-func (pi *partIter) nextSeriesID() bool {
- if pi.sidIdx >= len(pi.sids) {
- pi.err = io.EOF
- return false
- }
- pi.curBlock.seriesID = pi.sids[pi.sidIdx]
- pi.sidIdx++
- return true
-}
-
-func (pi *partIter) searchTargetSeriesID(sid common.SeriesID) bool {
- if pi.curBlock.seriesID >= sid {
- return true
- }
- if !pi.nextSeriesID() {
- return false
- }
- if pi.curBlock.seriesID >= sid {
- return true
- }
- sids := pi.sids[pi.sidIdx:]
- pi.sidIdx += sort.Search(len(sids), func(i int) bool {
- return sid <= sids[i]
- })
- if pi.sidIdx >= len(pi.sids) {
- pi.sidIdx = len(pi.sids)
- pi.err = io.EOF
- return false
- }
- pi.curBlock.seriesID = pi.sids[pi.sidIdx]
- pi.sidIdx++
- return true
-}
-
func (pi *partIter) loadNextBlockMetadata() bool {
for len(pi.primaryBlockMetadata) > 0 {
- if
!pi.searchTargetSeriesID(pi.primaryBlockMetadata[0].seriesID) {
- return false
- }
- pi.primaryBlockMetadata = searchPBM(pi.primaryBlockMetadata,
pi.curBlock.seriesID)
-
pbm := &pi.primaryBlockMetadata[0]
pi.primaryBlockMetadata = pi.primaryBlockMetadata[1:]
- if pi.curBlock.seriesID < pbm.seriesID {
- logger.Panicf("invariant violation:
pi.curBlock.seriesID cannot be smaller than pbm.seriesID; got %+v vs %+v",
&pi.curBlock.seriesID, &pbm.seriesID)
- }
+
+ // Process this block's series ID
+ pi.curBlock.seriesID = pbm.seriesID
if pbm.maxKey < pi.minKey || pbm.minKey > pi.maxKey {
continue
@@ -164,25 +109,6 @@ func (pi *partIter) loadNextBlockMetadata() bool {
return false
}
-func searchPBM(pbmIndex []primaryBlockMetadata, sid common.SeriesID)
[]primaryBlockMetadata {
- if sid < pbmIndex[0].seriesID {
- logger.Panicf("invariant violation: sid cannot be smaller than
pbmIndex[0]; got %d vs %d", sid, &pbmIndex[0].seriesID)
- }
-
- if sid == pbmIndex[0].seriesID {
- return pbmIndex
- }
-
- n := sort.Search(len(pbmIndex), func(i int) bool {
- return sid <= pbmIndex[i].seriesID
- })
- if n == 0 {
- logger.Panicf("invariant violation: sort.Search returned 0 for
sid > pbmIndex[0].seriesID; sid=%+v; pbmIndex[0].seriesID=%+v",
- sid, &pbmIndex[0].seriesID)
- }
- return pbmIndex[n-1:]
-}
-
func (pi *partIter) readPrimaryBlock(bms []blockMetadata, pbm
*primaryBlockMetadata) ([]blockMetadata, error) {
pi.compressedPrimaryBuf = bytes.ResizeOver(pi.compressedPrimaryBuf,
int(pbm.size))
fs.MustReadData(pi.p.primary, int64(pbm.offset),
pi.compressedPrimaryBuf)
@@ -203,24 +129,10 @@ func (pi *partIter) readPrimaryBlock(bms []blockMetadata,
pbm *primaryBlockMetad
func (pi *partIter) findBlock() bool {
bhs := pi.bms
for len(bhs) > 0 {
- sid := pi.curBlock.seriesID
- if bhs[0].seriesID < sid {
- n := sort.Search(len(bhs), func(i int) bool {
- return sid <= bhs[i].seriesID
- })
- if n == len(bhs) {
- break
- }
- bhs = bhs[n:]
- }
bm := &bhs[0]
- if bm.seriesID != sid {
- if !pi.searchTargetSeriesID(bm.seriesID) {
- return false
- }
- continue
- }
+ // Process all blocks sequentially
+ pi.curBlock.seriesID = bm.seriesID
if bm.maxKey < pi.minKey {
bhs = bhs[1:]
@@ -228,28 +140,10 @@ func (pi *partIter) findBlock() bool {
}
if bm.minKey > pi.maxKey {
- if !pi.nextSeriesID() {
- return false
- }
+ bhs = bhs[1:]
continue
}
- if pi.blockFilter != nil {
- shouldSkip, err := func() (bool, error) {
- tfo := generateTagFilterOp(bm, pi.p)
- defer releaseTagFilterOp(tfo)
- return pi.blockFilter.ShouldSkip(tfo)
- }()
- if err != nil {
- pi.err = err
- return false
- }
- if shouldSkip {
- bhs = bhs[1:]
- continue
- }
- }
-
pi.curBlock.copyFrom(bm)
pi.bms = bhs[1:]
diff --git a/banyand/internal/sidx/part_iter_test.go
b/banyand/internal/sidx/part_iter_test.go
index 4b1e2258..bba55979 100644
--- a/banyand/internal/sidx/part_iter_test.go
+++ b/banyand/internal/sidx/part_iter_test.go
@@ -37,7 +37,6 @@ func TestPartIterVerification(t *testing.T) {
tests := []struct {
name string
elements []testElement
- querySids []common.SeriesID
minKey int64
maxKey int64
expectedLen int
@@ -58,7 +57,6 @@ func TestPartIterVerification(t *testing.T) {
},
},
},
- querySids: []common.SeriesID{1},
minKey: 50,
maxKey: 150,
expectedLen: 1,
@@ -91,7 +89,6 @@ func TestPartIterVerification(t *testing.T) {
},
},
},
- querySids: []common.SeriesID{1},
minKey: 50,
maxKey: 250,
expectedLen: 1, // Elements from same series are
grouped into 1 block
@@ -136,10 +133,9 @@ func TestPartIterVerification(t *testing.T) {
},
},
},
- querySids: []common.SeriesID{1, 2, 3},
minKey: 50,
maxKey: 250,
- expectedLen: 3,
+ expectedLen: 3, // All series returned in full scan
},
{
name: "filtered by key range",
@@ -181,13 +177,12 @@ func TestPartIterVerification(t *testing.T) {
},
},
},
- querySids: []common.SeriesID{1},
minKey: 75,
maxKey: 150,
expectedLen: 1, // Block contains all elements
[50-200], overlaps query range [75-150]
},
{
- name: "filtered by series ID",
+ name: "all series returned in range",
elements: []testElement{
{
seriesID: 1,
@@ -226,10 +221,9 @@ func TestPartIterVerification(t *testing.T) {
},
},
},
- querySids: []common.SeriesID{2},
minKey: 50,
maxKey: 150,
- expectedLen: 1, // Only series 2 should match
+ expectedLen: 3, // All series in full scan mode
},
}
@@ -237,7 +231,6 @@ func TestPartIterVerification(t *testing.T) {
runTestCase := func(t *testing.T, tt struct {
name string
elements []testElement
- querySids []common.SeriesID
minKey int64
maxKey int64
expectedLen int
@@ -251,7 +244,7 @@ func TestPartIterVerification(t *testing.T) {
// Initialize partIter with clean blockMetadataArray
bma.reset() // Keep blockMetadataArray clean before passing to
init
- pi.init(bma, part, tt.querySids, tt.minKey, tt.maxKey, nil)
+ pi.init(bma, part, tt.minKey, tt.maxKey)
// Iterate through blocks and collect results
var foundElements []testElement
@@ -268,7 +261,6 @@ func TestPartIterVerification(t *testing.T) {
overlaps := curBlock.maxKey >= tt.minKey &&
curBlock.minKey <= tt.maxKey
assert.True(t, overlaps, "block should overlap with
query range [%d, %d], but got block range [%d, %d]",
tt.minKey, tt.maxKey, curBlock.minKey,
curBlock.maxKey)
- assert.Contains(t, tt.querySids, curBlock.seriesID,
"block seriesID should be in query sids")
// For verification, create a test element representing
this block
// Note: In a real scenario, you'd read the actual
block data
@@ -286,11 +278,6 @@ func TestPartIterVerification(t *testing.T) {
// Verify results
assert.Equal(t, tt.expectedLen, len(foundElements), "should
find expected number of elements")
- // Additional verification: ensure all found elements match
expected series
- for _, elem := range foundElements {
- assert.Contains(t, tt.querySids, elem.seriesID, "found
element should have expected seriesID")
- }
-
t.Logf("Test %s completed: found %d blocks, expected %d",
tt.name, blockCount, tt.expectedLen)
}
@@ -351,7 +338,7 @@ func TestPartIterEdgeCases(t *testing.T) {
testFS := fs.NewLocalFileSystem()
tempDir := t.TempDir()
- t.Run("empty series list", func(t *testing.T) {
+ t.Run("full scan with data", func(t *testing.T) {
// Create a simple part with data
elements := createTestElements([]testElement{
{
@@ -373,23 +360,23 @@ func TestPartIterEdgeCases(t *testing.T) {
defer ReleaseMemPart(mp)
mp.mustInitFromElements(elements)
- partDir := filepath.Join(tempDir, "empty_series_test")
+ partDir := filepath.Join(tempDir, "full_scan_test")
mp.mustFlush(testFS, partDir)
part := mustOpenPart(1, partDir, testFS)
defer part.close()
- // Test with empty series list
+ // Test full scan
bma := &blockMetadataArray{}
defer bma.reset()
pi := &partIter{}
bma.reset()
- pi.init(bma, part, []common.SeriesID{}, 0, 1000, nil)
+ pi.init(bma, part, 0, 1000)
- // Should not find any blocks with empty series list
+ // Should find the block in full scan mode
foundAny := pi.nextBlock()
- assert.False(t, foundAny, "should not find any blocks with
empty series list")
+ assert.True(t, foundAny, "should find blocks in full scan mode")
})
t.Run("no matching key range", func(t *testing.T) {
@@ -426,15 +413,15 @@ func TestPartIterEdgeCases(t *testing.T) {
pi := &partIter{}
bma.reset()
- pi.init(bma, part, []common.SeriesID{1}, 200, 300, nil) // No
overlap with key 100
+ pi.init(bma, part, 200, 300) // No overlap with key 100
// Should not find any blocks
foundAny := pi.nextBlock()
assert.False(t, foundAny, "should not find any blocks with
non-overlapping key range")
})
- t.Run("no matching series ID", func(t *testing.T) {
- // Create a part with seriesID 1
+ t.Run("all series returned in full scan", func(t *testing.T) {
+ // Create a part with multiple series
elements := createTestElements([]testElement{
{
seriesID: 1,
@@ -443,53 +430,19 @@ func TestPartIterEdgeCases(t *testing.T) {
tags: []tag{
{
name: "service",
- value:
[]byte("test-service"),
+ value:
[]byte("test-service-1"),
valueType: pbv1.ValueTypeStr,
},
},
},
- })
- defer releaseElements(elements)
-
- mp := GenerateMemPart()
- defer ReleaseMemPart(mp)
- mp.mustInitFromElements(elements)
-
- partDir := filepath.Join(tempDir, "no_match_series")
- mp.mustFlush(testFS, partDir)
-
- part := mustOpenPart(1, partDir, testFS)
- defer part.close()
-
- // Test with different series ID
- bma := &blockMetadataArray{}
- defer bma.reset()
- pi := &partIter{}
-
- bma.reset()
- pi.init(bma, part, []common.SeriesID{2}, 0, 200, nil) //
Different series ID
-
- // Should not find any blocks
- foundAny := pi.nextBlock()
- assert.False(t, foundAny, "should not find any blocks with
non-matching series ID")
- })
-}
-
-func TestPartIterBlockFilter(t *testing.T) {
- tempDir := t.TempDir()
- testFS := fs.NewLocalFileSystem()
-
- t.Run("blockFilter nil should not filter blocks", func(t *testing.T) {
- // Create test elements
- elements := createTestElements([]testElement{
{
- seriesID: 1,
+ seriesID: 2,
userKey: 100,
- data: []byte("data1"),
+ data: []byte("data2"),
tags: []tag{
{
name: "service",
- value:
[]byte("test-service"),
+ value:
[]byte("test-service-2"),
valueType: pbv1.ValueTypeStr,
},
},
@@ -501,72 +454,34 @@ func TestPartIterBlockFilter(t *testing.T) {
defer ReleaseMemPart(mp)
mp.mustInitFromElements(elements)
- partDir := filepath.Join(tempDir, "nil_filter")
+ partDir := filepath.Join(tempDir, "all_series")
mp.mustFlush(testFS, partDir)
part := mustOpenPart(1, partDir, testFS)
defer part.close()
- // Test with nil blockFilter
+ // Full scan returns all series
bma := &blockMetadataArray{}
defer bma.reset()
pi := &partIter{}
bma.reset()
- pi.init(bma, part, []common.SeriesID{1}, 0, 200, nil) // nil
blockFilter
+ pi.init(bma, part, 0, 200)
- // Should find the block
- foundAny := pi.nextBlock()
- assert.True(t, foundAny, "should find blocks when blockFilter
is nil")
- assert.NoError(t, pi.error())
+ // Should find blocks for all series
+ count := 0
+ for pi.nextBlock() {
+ count++
+ }
+ assert.Equal(t, 2, count, "should find blocks for all series in
full scan mode")
})
+}
- t.Run("blockFilter with mock filter that allows all", func(t
*testing.T) {
- // Create test elements
- elements := createTestElements([]testElement{
- {
- seriesID: 1,
- userKey: 100,
- data: []byte("data1"),
- tags: []tag{
- {
- name: "service",
- value:
[]byte("test-service"),
- valueType: pbv1.ValueTypeStr,
- },
- },
- },
- })
- defer releaseElements(elements)
-
- mp := GenerateMemPart()
- defer ReleaseMemPart(mp)
- mp.mustInitFromElements(elements)
-
- partDir := filepath.Join(tempDir, "allow_all_filter")
- mp.mustFlush(testFS, partDir)
-
- part := mustOpenPart(1, partDir, testFS)
- defer part.close()
-
- // Create a mock filter that allows all blocks
- mockFilter := &mockBlockFilter{shouldSkip: false}
-
- // Test with blockFilter that allows all
- bma := &blockMetadataArray{}
- defer bma.reset()
- pi := &partIter{}
-
- bma.reset()
- pi.init(bma, part, []common.SeriesID{1}, 0, 200, mockFilter)
-
- // Should find the block
- foundAny := pi.nextBlock()
- assert.True(t, foundAny, "should find blocks when blockFilter
allows all")
- assert.NoError(t, pi.error())
- })
+func TestPartIterBlockFilter(t *testing.T) {
+ tempDir := t.TempDir()
+ testFS := fs.NewLocalFileSystem()
- t.Run("blockFilter with mock filter that skips all", func(t *testing.T)
{
+ t.Run("blockFilter nil should not filter blocks", func(t *testing.T) {
// Create test elements
elements := createTestElements([]testElement{
{
@@ -588,152 +503,79 @@ func TestPartIterBlockFilter(t *testing.T) {
defer ReleaseMemPart(mp)
mp.mustInitFromElements(elements)
- partDir := filepath.Join(tempDir, "skip_all_filter")
+ partDir := filepath.Join(tempDir, "nil_filter")
mp.mustFlush(testFS, partDir)
part := mustOpenPart(1, partDir, testFS)
defer part.close()
- // Create a mock filter that skips all blocks
- mockFilter := &mockBlockFilter{shouldSkip: true}
-
- // Test with blockFilter that skips all
+ // Test with nil blockFilter (note: blockFilter removed from
init, always nil now)
bma := &blockMetadataArray{}
defer bma.reset()
pi := &partIter{}
bma.reset()
- pi.init(bma, part, []common.SeriesID{1}, 0, 200, mockFilter)
+ pi.init(bma, part, 0, 200)
- // Should not find any blocks
+ // Should find the block
foundAny := pi.nextBlock()
- assert.False(t, foundAny, "should not find blocks when
blockFilter skips all")
+ assert.True(t, foundAny, "should find blocks")
assert.NoError(t, pi.error())
})
- t.Run("blockFilter with error should propagate error", func(t
*testing.T) {
- // Create test elements
- elements := createTestElements([]testElement{
- {
- seriesID: 1,
- userKey: 100,
- data: []byte("data1"),
- tags: []tag{
- {
- name: "service",
- value:
[]byte("test-service"),
- valueType: pbv1.ValueTypeStr,
- },
- },
- },
- })
- defer releaseElements(elements)
-
- mp := GenerateMemPart()
- defer ReleaseMemPart(mp)
- mp.mustInitFromElements(elements)
-
- partDir := filepath.Join(tempDir, "error_filter")
- mp.mustFlush(testFS, partDir)
-
- part := mustOpenPart(1, partDir, testFS)
- defer part.close()
-
- // Create a mock filter that returns an error
- expectedErr := fmt.Errorf("test filter error")
- mockFilter := &mockBlockFilter{shouldSkip: false, err:
expectedErr}
-
- // Test with blockFilter that returns an error
- bma := &blockMetadataArray{}
- defer bma.reset()
- pi := &partIter{}
-
- bma.reset()
- pi.init(bma, part, []common.SeriesID{1}, 0, 200, mockFilter)
-
- // Should not find any blocks and should have error
- foundAny := pi.nextBlock()
- assert.False(t, foundAny, "should not find blocks when
blockFilter returns error")
- assert.Error(t, pi.error())
- assert.Contains(t, pi.error().Error(), "test filter error")
- })
-}
-
-// mockBlockFilter is a mock implementation of index.Filter for testing.
-type mockBlockFilter struct {
- err error
- shouldSkip bool
-}
-
-func (mbf *mockBlockFilter) ShouldSkip(_ index.FilterOp) (bool, error) {
- if mbf.err != nil {
- return false, mbf.err
- }
- return mbf.shouldSkip, nil
-}
-
-// These methods are required to satisfy the index.Filter interface.
-func (mbf *mockBlockFilter) String() string {
- return "mockBlockFilter"
-}
-
-func (mbf *mockBlockFilter) Execute(_ index.GetSearcher, _ common.SeriesID, _
*index.RangeOpts) (posting.List, posting.List, error) {
- // Not used in our tests, return empty implementation
- return nil, nil, nil
+ // Note: blockFilter tests removed as blockFilter is no longer
supported in partIter.init()
+ // The blockFilter functionality is still available in
partIter.findBlock() but is always nil
+ // when initialized through init(). For block filtering, use the
ScanQuery API with TagFilter instead.
}
-func TestPartIterShouldSkip(t *testing.T) {
+func TestPartIterFullScan(t *testing.T) {
tempDir := t.TempDir()
testFS := fs.NewLocalFileSystem()
- // Strategy: Create many elements to exceed maxElementsPerBlock (8192)
and force multiple blocks.
- // We'll create elements in two batches with different tag values but
same seriesID.
-
- const elementsPerBatch = 8200 // Exceed maxElementsPerBlock to force
multiple blocks
+ // Test full-scan behavior: Create elements for multiple series
var allElements []testElement
- // First batch: seriesID=1, status="pending", keys 0-8199
- for i := 0; i < elementsPerBatch; i++ {
+ // First series: seriesID=1
+ for i := 0; i < 100; i++ {
allElements = append(allElements, testElement{
seriesID: 1,
userKey: int64(i),
- data: []byte(fmt.Sprintf("pending_data_%d", i)),
+ data: []byte(fmt.Sprintf("series1_data_%d", i)),
tags: []tag{
{
- name: "status",
- value: []byte("pending"),
+ name: "service",
+ value: []byte("service-1"),
valueType: pbv1.ValueTypeStr,
},
},
})
}
- // Second batch: seriesID=1, status="success", keys 20000-28199
- // Use a large gap in keys to ensure they're in different blocks
- for i := 0; i < elementsPerBatch; i++ {
+ // Second series: seriesID=2
+ for i := 0; i < 100; i++ {
allElements = append(allElements, testElement{
- seriesID: 1,
- userKey: int64(20000 + i),
- data: []byte(fmt.Sprintf("success_data_%d", i)),
+ seriesID: 2,
+ userKey: int64(i),
+ data: []byte(fmt.Sprintf("series2_data_%d", i)),
tags: []tag{
{
- name: "status",
- value: []byte("success"),
+ name: "service",
+ value: []byte("service-2"),
valueType: pbv1.ValueTypeStr,
},
},
})
}
- // Add a third series to verify we don't incorrectly skip to this
+ // Third series: seriesID=3
allElements = append(allElements, testElement{
- seriesID: 2,
+ seriesID: 3,
userKey: 5000,
- data: []byte("series2_data"),
+ data: []byte("series3_data"),
tags: []tag{
{
- name: "status",
- value: []byte("other"),
+ name: "service",
+ value: []byte("service-3"),
valueType: pbv1.ValueTypeStr,
},
},
@@ -759,21 +601,13 @@ func TestPartIterShouldSkip(t *testing.T) {
t.Logf(" Primary block %d: seriesID=%d, keys [%d-%d]", i,
pbm.seriesID, pbm.minKey, pbm.maxKey)
}
- // Create a selective mock filter that skips blocks with
status="pending"
- // but allows blocks with status="success"
- selectiveFilter := &selectiveMockBlockFilter{
- tagName: "status",
- skipValue: "pending",
- skipCallCount: 0,
- }
-
- // Test with the selective filter, querying only seriesID=1
+ // Test full scan - should return all series
bma := &blockMetadataArray{}
defer bma.reset()
pi := &partIter{}
bma.reset()
- pi.init(bma, part, []common.SeriesID{1, 2}, 0, 30000, selectiveFilter)
+ pi.init(bma, part, 0, 10000)
// Iterate through blocks and collect results
var foundBlocks []struct {
@@ -798,25 +632,44 @@ func TestPartIterShouldSkip(t *testing.T) {
require.NoError(t, pi.error())
- // Verify the filter was called at least once
- assert.Greater(t, selectiveFilter.skipCallCount, 0, "filter should have
been called")
-
- foundSeries1Success := false
+ // Verify all series are found in full scan mode
+ foundSeries := make(map[common.SeriesID]bool)
for _, block := range foundBlocks {
- if block.seriesID == 1 && block.minKey >= 20000 {
- foundSeries1Success = true
- t.Logf("✓ Found the expected seriesID=1 success block:
keys [%d-%d]", block.minKey, block.maxKey)
- }
+ foundSeries[block.seriesID] = true
+ }
+
+ assert.True(t, foundSeries[1], "should find series 1 in full scan")
+ assert.True(t, foundSeries[2], "should find series 2 in full scan")
+ assert.True(t, foundSeries[3], "should find series 3 in full scan")
+ assert.Equal(t, 3, len(foundSeries), "should find all 3 series in full
scan mode")
+}
+
+// mockBlockFilter is a mock implementation of index.Filter for testing.
+// NOTE: This is kept for other test files that still reference it.
+type mockBlockFilter struct {
+ err error
+ shouldSkip bool
+}
+
+func (mbf *mockBlockFilter) ShouldSkip(_ index.FilterOp) (bool, error) {
+ if mbf.err != nil {
+ return false, mbf.err
}
+ return mbf.shouldSkip, nil
+}
- // This assertion will FAIL with the bug, demonstrating the issue
- assert.True(t, foundSeries1Success,
- "BUG: Should find seriesID=1 block with status='success'
(minKey >= 20000), "+
- "but the iterator incorrectly skipped to next series
after first block with status='pending' failed the filter. "+
- "Found %d total blocks", len(foundBlocks))
+// These methods are required to satisfy the index.Filter interface.
+func (mbf *mockBlockFilter) String() string {
+ return "mockBlockFilter"
+}
+
+func (mbf *mockBlockFilter) Execute(_ index.GetSearcher, _ common.SeriesID, _
*index.RangeOpts) (posting.List, posting.List, error) {
+ // Not used in our tests, return empty implementation
+ return nil, nil, nil
}
// selectiveMockBlockFilter is a mock filter that selectively skips blocks
based on tag values.
+// NOTE: This is kept for other test files that still reference it.
type selectiveMockBlockFilter struct {
tagName string
skipValue string
diff --git a/banyand/internal/sidx/scan_query.go
b/banyand/internal/sidx/scan_query.go
new file mode 100644
index 00000000..93a14592
--- /dev/null
+++ b/banyand/internal/sidx/scan_query.go
@@ -0,0 +1,198 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+ "context"
+ "math"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+)
+
+// ScanQuery executes a synchronous full-scan query.
+func (s *sidx) ScanQuery(ctx context.Context, req ScanQueryRequest)
([]*QueryResponse, error) {
+ if err := req.Validate(); err != nil {
+ return nil, err
+ }
+
+ snap := s.currentSnapshot()
+ if snap == nil {
+ return nil, nil
+ }
+ defer snap.decRef()
+
+ // Set default batch size
+ maxBatchSize := req.MaxBatchSize
+ if maxBatchSize <= 0 {
+ maxBatchSize = 1000
+ }
+
+ // Set key range
+ minKey := int64(math.MinInt64)
+ maxKey := int64(math.MaxInt64)
+ if req.MinKey != nil {
+ minKey = *req.MinKey
+ }
+ if req.MaxKey != nil {
+ maxKey = *req.MaxKey
+ }
+
+ var results []*QueryResponse
+
+ // Prepare Tags map if projection is specified
+ var tagsMap map[string][]string
+ if len(req.TagProjection) > 0 {
+ tagsMap = make(map[string][]string)
+ for _, proj := range req.TagProjection {
+ for _, tagName := range proj.Names {
+ tagsMap[tagName] = make([]string, 0,
maxBatchSize)
+ }
+ }
+ }
+
+ currentBatch := &QueryResponse{
+ Keys: make([]int64, 0, maxBatchSize),
+ Data: make([][]byte, 0, maxBatchSize),
+ SIDs: make([]common.SeriesID, 0, maxBatchSize),
+ PartIDs: make([]uint64, 0, maxBatchSize),
+ Tags: tagsMap,
+ }
+
+ // Scan all parts
+ totalParts := len(snap.parts)
+ for partIdx, pw := range snap.parts {
+ var err error
+ if currentBatch, err = s.scanPart(ctx, pw, req, minKey, maxKey,
&results, currentBatch, maxBatchSize); err != nil {
+ return nil, err
+ }
+
+ // Count total rows found so far
+ totalRowsFound := 0
+ for _, res := range results {
+ totalRowsFound += res.Len()
+ }
+ totalRowsFound += currentBatch.Len()
+
+ // Report progress if callback is provided
+ if req.OnProgress != nil {
+ req.OnProgress(partIdx+1, totalParts, totalRowsFound)
+ }
+ }
+
+ // Add remaining batch if not empty
+ if currentBatch.Len() > 0 {
+ results = append(results, currentBatch)
+ }
+
+ return results, nil
+}
+
+func (s *sidx) scanPart(ctx context.Context, pw *partWrapper, req
ScanQueryRequest,
+ minKey, maxKey int64, results *[]*QueryResponse, currentBatch
*QueryResponse,
+ maxBatchSize int,
+) (*QueryResponse, error) {
+ p := pw.p
+ bma := generateBlockMetadataArray()
+ defer releaseBlockMetadataArray(bma)
+
+ // Create partIter on stack
+ pi := &partIter{}
+
+ // Initialize iterator for full scan
+ pi.init(bma, p, minKey, maxKey)
+
+ // Iterate through all blocks
+ for pi.nextBlock() {
+ if err := ctx.Err(); err != nil {
+ return currentBatch, err
+ }
+
+ // Create a temporary QueryRequest to reuse existing
blockCursor infrastructure
+ tmpReq := QueryRequest{
+ TagFilter: req.TagFilter,
+ MinKey: &minKey,
+ MaxKey: &maxKey,
+ TagProjection: req.TagProjection,
+ MaxBatchSize: maxBatchSize,
+ }
+
+ bc := generateBlockCursor()
+ bc.init(p, pi.curBlock, tmpReq)
+
+ // Load block data
+ tmpBlock := generateBlock()
+
+ // When we have a TagFilter, we need to load ALL tags so the
filter can check them.
+ // Pass nil to loadBlockCursor to load all available tags.
+ var tagsToLoad map[string]struct{}
+ if req.TagFilter != nil {
+ // Load all tags when filtering (nil/empty map triggers
loading all tags)
+ tagsToLoad = nil
+ } else if len(req.TagProjection) > 0 {
+ // Optimize: only load projected tags when no filter
+ tagsToLoad = make(map[string]struct{})
+ for _, proj := range req.TagProjection {
+ for _, tagName := range proj.Names {
+ tagsToLoad[tagName] = struct{}{}
+ }
+ }
+ }
+
+ if s.loadBlockCursor(bc, tmpBlock, blockScanResult{
+ p: p,
+ bm: *pi.curBlock,
+ }, tagsToLoad, tmpReq, s.pm, nil) {
+ // Copy all rows from this block cursor to current batch
+ for idx := 0; idx < len(bc.userKeys); idx++ {
+ bc.idx = idx
+
+ // Check if batch is full before adding
+ if currentBatch.Len() >= maxBatchSize {
+ *results = append(*results,
currentBatch)
+
+ // Prepare Tags map for new batch if
projection is specified
+ var newTagsMap map[string][]string
+ if len(req.TagProjection) > 0 {
+ newTagsMap =
make(map[string][]string)
+ for _, proj := range
req.TagProjection {
+ for _, tagName := range
proj.Names {
+
newTagsMap[tagName] = make([]string, 0, maxBatchSize)
+ }
+ }
+ }
+
+ currentBatch = &QueryResponse{
+ Keys: make([]int64, 0,
maxBatchSize),
+ Data: make([][]byte, 0,
maxBatchSize),
+ SIDs:
make([]common.SeriesID, 0, maxBatchSize),
+ PartIDs: make([]uint64, 0,
maxBatchSize),
+ Tags: newTagsMap,
+ }
+ }
+
+ // Add to current batch
+ _ = bc.copyTo(currentBatch)
+ }
+ }
+
+ releaseBlock(tmpBlock)
+ releaseBlockCursor(bc)
+ }
+
+ return currentBatch, pi.error()
+}
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index 7844087b..197c1870 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -23,6 +23,7 @@ import (
"context"
"os"
"path/filepath"
+ "strings"
"sync"
"sync/atomic"
@@ -496,9 +497,39 @@ func (bc *blockCursor) copyTo(result *QueryResponse) bool {
result.SIDs = append(result.SIDs, bc.seriesID)
result.PartIDs = append(result.PartIDs, bc.p.ID())
+ // Copy projected tags if specified
+ if len(bc.request.TagProjection) > 0 && len(result.Tags) > 0 {
+ for _, proj := range bc.request.TagProjection {
+ for _, tagName := range proj.Names {
+ if tagData, exists := bc.tags[tagName]; exists
&& bc.idx < len(tagData) {
+ tagValue :=
formatTagValue(tagData[bc.idx])
+ result.Tags[tagName] =
append(result.Tags[tagName], tagValue)
+ } else {
+ // Tag not present for this row
+ result.Tags[tagName] =
append(result.Tags[tagName], "")
+ }
+ }
+ }
+ }
+
return true
}
+// formatTagValue converts a Tag to its string form: arrays are serialized as
+// bracketed, comma-separated values, while scalar values are returned as-is.
+// When both Value and ValueArr are empty, the function returns an empty
string.
+func formatTagValue(tag Tag) string {
+ if len(tag.ValueArr) > 0 {
+ // Array of values
+ values := make([]string, len(tag.ValueArr))
+ for i, v := range tag.ValueArr {
+ values[i] = string(v)
+ }
+ return "[" + strings.Join(values, ",") + "]"
+ }
+ return string(tag.Value)
+}
+
// blockCursorHeap implements heap.Interface for sorting block cursors.
type blockCursorHeap struct {
bcc []*blockCursor
diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go
index b4c1d510..a342860a 100644
--- a/banyand/internal/sidx/tag.go
+++ b/banyand/internal/sidx/tag.go
@@ -60,12 +60,7 @@ type tagRow struct {
func (tr *tagRow) reset() {
tr.value = nil
- if tr.valueArr != nil {
- for i := range tr.valueArr {
- tr.valueArr[i] = nil
- }
- }
- tr.valueArr = tr.valueArr[:0]
+ tr.valueArr = nil
}
var (
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 9cb32f9d..17291452 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -297,15 +297,19 @@ func (ms *measureService) Query(ctx context.Context, req
*measurev1.QueryRequest
return nil, status.Errorf(codes.InvalidArgument, "%v is invalid
:%s", req.GetTimeRange(), err)
}
now := time.Now()
+ var tracer *query.Tracer
+ var span *query.Span
+ var responseDataPointCount int
if req.Trace {
- tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
- span, _ := tracer.StartSpan(ctx, "measure-grpc")
+ tracer, _ = query.NewTracer(ctx, now.Format(time.RFC3339Nano))
+ span, _ = tracer.StartSpan(ctx, "measure-grpc")
span.Tag("request", convert.BytesToString(logger.Proto(req)))
defer func() {
if err != nil {
span.Error(err)
span.Stop()
} else {
+ span.Tagf("response_data_point_count", "%d",
responseDataPointCount)
span.AddSubTrace(resp.Trace)
span.Stop()
resp.Trace = tracer.ToProto()
@@ -326,6 +330,7 @@ func (ms *measureService) Query(ctx context.Context, req
*measurev1.QueryRequest
data := msg.Data()
switch d := data.(type) {
case *measurev1.QueryResponse:
+ responseDataPointCount = len(d.DataPoints)
return d, nil
case *common.Error:
return nil, errors.WithMessage(errQueryMsg, d.Error())
@@ -348,18 +353,22 @@ func (ms *measureService) TopN(ctx context.Context,
topNRequest *measurev1.TopNR
return nil, status.Errorf(codes.InvalidArgument, "%v is invalid
:%s", topNRequest.GetTimeRange(), err)
}
now := time.Now()
+ var topNTracer *query.Tracer
+ var topNSpan *query.Span
+ var responseListCount int
if topNRequest.Trace {
- tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
- span, _ := tracer.StartSpan(ctx, "topn-grpc")
- span.Tag("request",
convert.BytesToString(logger.Proto(topNRequest)))
+ topNTracer, _ = query.NewTracer(ctx,
now.Format(time.RFC3339Nano))
+ topNSpan, _ = topNTracer.StartSpan(ctx, "topn-grpc")
+ topNSpan.Tag("request",
convert.BytesToString(logger.Proto(topNRequest)))
defer func() {
if err != nil {
- span.Error(err)
+ topNSpan.Error(err)
} else {
- span.AddSubTrace(resp.Trace)
- resp.Trace = tracer.ToProto()
+ topNSpan.Tagf("response_list_count", "%d",
responseListCount)
+ topNSpan.AddSubTrace(resp.Trace)
+ resp.Trace = topNTracer.ToProto()
}
- span.Stop()
+ topNSpan.Stop()
}()
}
message := bus.NewMessage(bus.MessageID(now.UnixNano()), topNRequest)
@@ -374,6 +383,7 @@ func (ms *measureService) TopN(ctx context.Context,
topNRequest *measurev1.TopNR
data := msg.Data()
switch d := data.(type) {
case *measurev1.TopNResponse:
+ responseListCount = len(d.Lists)
return d, nil
case *common.Error:
return nil, errors.WithMessage(errQueryMsg, d.Error())
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index c19b6038..d0da8c6f 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -279,15 +279,19 @@ func (s *streamService) Query(ctx context.Context, req
*streamv1.QueryRequest) (
return nil, status.Errorf(codes.InvalidArgument, "%v is invalid
:%s", req.GetTimeRange(), err)
}
now := time.Now()
+ var tracer *query.Tracer
+ var span *query.Span
+ var responseElementCount int
if req.Trace {
- tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
- span, _ := tracer.StartSpan(ctx, "stream-grpc")
+ tracer, _ = query.NewTracer(ctx, now.Format(time.RFC3339Nano))
+ span, _ = tracer.StartSpan(ctx, "stream-grpc")
span.Tag("request", convert.BytesToString(logger.Proto(req)))
defer func() {
if err != nil {
span.Error(err)
span.Stop()
} else {
+ span.Tagf("response_element_count", "%d",
responseElementCount)
span.AddSubTrace(resp.Trace)
span.Stop()
resp.Trace = tracer.ToProto()
@@ -309,6 +313,7 @@ func (s *streamService) Query(ctx context.Context, req
*streamv1.QueryRequest) (
data := msg.Data()
switch d := data.(type) {
case *streamv1.QueryResponse:
+ responseElementCount = len(d.Elements)
return d, nil
case *common.Error:
return nil, errors.WithMessage(errQueryMsg, d.Error())
diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go
index 81000319..c4989652 100644
--- a/banyand/liaison/grpc/trace.go
+++ b/banyand/liaison/grpc/trace.go
@@ -342,15 +342,19 @@ func (s *traceService) Query(ctx context.Context, req
*tracev1.QueryRequest) (re
return nil, status.Errorf(codes.InvalidArgument, "%v is invalid
:%s", req.GetTimeRange(), err)
}
now := time.Now()
+ var tracer *query.Tracer
+ var span *query.Span
+ var responseTraceCount int
if req.Trace {
- tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
- span, _ := tracer.StartSpan(ctx, "trace-grpc")
+ tracer, _ = query.NewTracer(ctx, now.Format(time.RFC3339Nano))
+ span, _ = tracer.StartSpan(ctx, "trace-grpc")
span.Tag("request", convert.BytesToString(logger.Proto(req)))
defer func() {
if err != nil {
span.Error(err)
span.Stop()
} else if resp != nil && resp !=
emptyTraceQueryResponse {
+ span.Tagf("response_trace_count", "%d",
responseTraceCount)
span.AddSubTrace(resp.TraceQueryResult)
span.Stop()
resp.TraceQueryResult = tracer.ToProto()
@@ -381,6 +385,7 @@ func (s *traceService) Query(ctx context.Context, req
*tracev1.QueryRequest) (re
}
traces = append(traces, trace)
}
+ responseTraceCount = len(traces)
return &tracev1.QueryResponse{
Traces: traces,
TraceQueryResult: d.TraceQueryResult,
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 65808e25..388ac0b5 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -123,6 +123,7 @@ func (p *streamQueryProcessor) Rev(ctx context.Context,
message bus.Message) (re
data := resp.Data()
switch d := data.(type) {
case *streamv1.QueryResponse:
+ span.Tag("resp_count", fmt.Sprintf("%d",
len(d.Elements)))
span.Stop()
d.Trace = tracer.ToProto()
case *common.Error:
@@ -272,6 +273,7 @@ func (p *measureQueryProcessor) executeQuery(ctx
context.Context, queryCriteria
data := resp.Data()
switch d := data.(type) {
case *measurev1.QueryResponse:
+ span.Tag("resp_count", fmt.Sprintf("%d",
len(d.DataPoints)))
d.Trace = tracer.ToProto()
span.Stop()
case *common.Error:
@@ -551,6 +553,7 @@ func (tm *traceMonitor) finishTrace(resp *bus.Message,
messageID int64) {
data := resp.Data()
switch d := data.(type) {
case *tracev1.InternalQueryResponse:
+ tm.span.Tag("resp_count", fmt.Sprintf("%d",
len(d.InternalTraces)))
tm.span.Stop()
d.TraceQueryResult = tm.tracer.ToProto()
case *common.Error:
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 8aecc80f..3102f8a3 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -101,6 +101,7 @@ func (t *trace) Query(ctx context.Context, tqo
model.TraceQueryOptions) (model.T
pipelineCtx, cancel := context.WithTimeout(ctx, queryTimeout)
result.ctx = pipelineCtx
result.cancel = cancel
+ result.recordCursor, result.recordResult, result.finishResultSpan =
startQueryResultSpan(pipelineCtx)
if result.keys == nil {
result.keys = make(map[string]int64)
@@ -244,17 +245,20 @@ func (t *trace) prepareSIDXStreaming(
type queryResult struct {
ctx context.Context
err error
- currentBatch *scanBatch
- tagProjection *model.TagProjection
+ streamDone <-chan struct{}
+ recordCursor func(*blockCursor)
keys map[string]int64
cursorBatchCh <-chan *scanBatch
cancel context.CancelFunc
currentCursorGroups map[string][]*blockCursor
- streamDone <-chan struct{}
+ currentBatch *scanBatch
+ tagProjection *model.TagProjection
+ finishResultSpan func(int, error)
+ recordResult func(*model.TraceResult)
currentTraceIDs []string
segments []storage.Segment[*tsTable, option]
- currentIndex int
hit int
+ currentIndex int
}
func (qr *queryResult) Pull() *model.TraceResult {
@@ -313,6 +317,10 @@ func (qr *queryResult) Pull() *model.TraceResult {
qr.currentIndex++
delete(qr.currentCursorGroups, traceID)
+ if qr.recordResult != nil {
+ qr.recordResult(result)
+ }
+
return result
}
}
@@ -367,6 +375,9 @@ func (qr *queryResult) ensureCurrentBatch() bool {
return true
}
if result.cursor != nil {
+ if qr.recordCursor != nil {
+
qr.recordCursor(result.cursor)
+ }
traceID :=
result.cursor.bm.traceID
qr.currentCursorGroups[traceID]
= append(qr.currentCursorGroups[traceID], result.cursor)
}
@@ -498,6 +509,16 @@ func (qr *queryResult) Release() {
qr.segments[i].DecRef()
}
qr.segments = qr.segments[:0]
+
+ qr.finishTracing(qr.err)
+}
+
+func (qr *queryResult) finishTracing(err error) {
+ if qr.finishResultSpan == nil {
+ return
+ }
+ qr.finishResultSpan(qr.hit, err)
+ qr.finishResultSpan = nil
}
func mustDecodeTagValue(valueType pbv1.ValueType, value []byte)
*modelv1.TagValue {
@@ -508,6 +529,11 @@ func mustDecodeTagValueAndArray(valueType pbv1.ValueType,
value []byte, valueArr
if value == nil && valueArr == nil {
return pbv1.NullTagValue
}
+ if value == nil &&
+ valueType != pbv1.ValueTypeInt64Arr &&
+ valueType != pbv1.ValueTypeStrArr {
+ return pbv1.NullTagValue
+ }
switch valueType {
case pbv1.ValueTypeInt64:
return int64TagValue(convert.BytesToInt64(value))
diff --git a/banyand/trace/streaming_pipeline_test.go
b/banyand/trace/streaming_pipeline_test.go
index 9e313f70..339208d2 100644
--- a/banyand/trace/streaming_pipeline_test.go
+++ b/banyand/trace/streaming_pipeline_test.go
@@ -82,6 +82,9 @@ func (f *fakeSIDX) StreamingParts(map[uint64]struct{},
string, uint32, string) (
func (f *fakeSIDX) PartPaths(map[uint64]struct{}) map[uint64]string { return
map[uint64]string{} }
func (f *fakeSIDX) IntroduceSynced(map[uint64]struct{}) func() { return
func() {} }
func (f *fakeSIDX) TakeFileSnapshot(_ string) error { return
nil }
+func (f *fakeSIDX) ScanQuery(context.Context, sidx.ScanQueryRequest)
([]*sidx.QueryResponse, error) {
+ return nil, nil
+}
type fakeSIDXWithErr struct {
*fakeSIDX
@@ -566,6 +569,10 @@ type fakeSIDXInfinite struct {
keyStart int64
}
+func (f *fakeSIDXInfinite) ScanQuery(context.Context, sidx.ScanQueryRequest)
([]*sidx.QueryResponse, error) {
+ return nil, nil
+}
+
func (f *fakeSIDXInfinite) StreamingQuery(ctx context.Context, _
sidx.QueryRequest) (<-chan *sidx.QueryResponse, <-chan error) {
results := make(chan *sidx.QueryResponse)
errCh := make(chan error, 1)
diff --git a/banyand/trace/tracing.go b/banyand/trace/tracing.go
index f3ed8ac2..69d2a252 100644
--- a/banyand/trace/tracing.go
+++ b/banyand/trace/tracing.go
@@ -28,6 +28,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/apache/skywalking-banyandb/pkg/query"
+ "github.com/apache/skywalking-banyandb/pkg/query/model"
)
const (
@@ -354,3 +355,81 @@ func startAggregatedBlockScanSpan(ctx context.Context,
groupedIDs [][]string, pa
span.Stop()
}
}
+
+// startQueryResultSpan records aggregated metrics for cursor consumption and
trace results.
+// It returns recorders for cursor/result events and a finish function to
complete the span.
+func startQueryResultSpan(ctx context.Context) (func(*blockCursor),
func(*model.TraceResult), func(int, error)) {
+ tracer := query.GetTracer(ctx)
+ if tracer == nil {
+ return nil, nil, nil
+ }
+
+ span, _ := tracer.StartSpan(ctx, "query-result")
+
+ var (
+ cursorCount int
+ cursorBytes uint64
+ cursorTraceStat = make(map[string]int)
+ cursorSamples []string
+
+ resultRecorded int
+ resultTraceStat = make(map[string]int)
+ resultSamples []string
+ )
+
+ recordCursor := func(bc *blockCursor) {
+ if bc == nil {
+ return
+ }
+ cursorCount++
+ cursorBytes += bc.bm.uncompressedSpanSizeBytes
+ cursorTraceStat[bc.bm.traceID]++
+ if len(cursorSamples) < traceIDSampleLimit {
+ cursorSamples = append(cursorSamples,
fmt.Sprintf("%s:%d", bc.bm.traceID, bc.bm.count))
+ }
+ }
+
+ recordResult := func(res *model.TraceResult) {
+ if res == nil || res.Error != nil || res.TID == "" {
+ return
+ }
+ resultRecorded++
+ resultTraceStat[res.TID]++
+ if len(resultSamples) < traceIDSampleLimit {
+ resultSamples = append(resultSamples,
fmt.Sprintf("%s:%d", res.TID, len(res.Spans)))
+ }
+ }
+
+ finish := func(hit int, err error) {
+ span.Tag("cursor_total", strconv.Itoa(cursorCount))
+ if cursorBytes > 0 {
+ span.Tag("cursor_bytes", humanize.Bytes(cursorBytes))
+ }
+ span.Tag("cursor_trace_total",
strconv.Itoa(len(cursorTraceStat)))
+ if len(cursorSamples) > 0 {
+ span.Tag("cursor_sample", strings.Join(cursorSamples,
","))
+ }
+
+ span.Tag("result_recorded", strconv.Itoa(resultRecorded))
+ span.Tag("result_trace_total",
strconv.Itoa(len(resultTraceStat)))
+ if len(resultSamples) > 0 {
+ span.Tag("result_sample", strings.Join(resultSamples,
","))
+ }
+
+ span.Tag("result_hit_count", strconv.Itoa(hit))
+ if resultRecorded != hit {
+ span.Tag("result_hit_mismatch",
strconv.Itoa(resultRecorded-hit))
+ }
+ traceGap := len(cursorTraceStat) - len(resultTraceStat)
+ if traceGap > 0 {
+ span.Tag("result_trace_gap", strconv.Itoa(traceGap))
+ }
+
+ if err != nil {
+ span.Error(err)
+ }
+ span.Stop()
+ }
+
+ return recordCursor, recordResult, finish
+}
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go
b/pkg/query/logical/measure/measure_plan_distributed.go
index 10d25248..bd3ebfcc 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -262,6 +262,8 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi
executor.MIterator, e
}
var see []sort.Iterator[*comparableDataPoint]
var pushedDownAggDps []*measurev1.DataPoint
+ var responseCount int
+ var dataPointCount int
for _, f := range ff {
if m, getErr := f.Get(); getErr != nil {
err = multierr.Append(err, getErr)
@@ -271,18 +273,25 @@ func (t *distributedPlan) Execute(ctx context.Context)
(mi executor.MIterator, e
continue
}
resp := d.(*measurev1.QueryResponse)
+ responseCount++
if span != nil {
span.AddSubTrace(resp.Trace)
}
if t.needCompletePushDownAgg {
pushedDownAggDps = append(pushedDownAggDps,
resp.DataPoints...)
+ dataPointCount += len(resp.DataPoints)
continue
}
+ dataPointCount += len(resp.DataPoints)
see = append(see,
newSortableElements(resp.DataPoints,
t.sortByTime, t.sortTagSpec))
}
}
+ if span != nil {
+ span.Tagf("response_count", "%d", responseCount)
+ span.Tagf("data_point_count", "%d", dataPointCount)
+ }
if t.needCompletePushDownAgg {
return &pushedDownAggregatedIterator{dataPoints:
pushedDownAggDps}, err
}
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go
b/pkg/query/logical/stream/stream_plan_distributed.go
index 7827f56f..b9937765 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -159,6 +159,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (ee
[]*streamv1.Element,
}
var allErr error
var see []sort.Iterator[*comparableElement]
+ var responseCount int
for _, f := range ff {
if m, getErr := f.Get(); getErr != nil {
allErr = multierr.Append(allErr, getErr)
@@ -168,6 +169,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (ee
[]*streamv1.Element,
continue
}
resp := d.(*streamv1.QueryResponse)
+ responseCount++
if span != nil {
span.AddSubTrace(resp.Trace)
}
@@ -185,6 +187,10 @@ func (t *distributedPlan) Execute(ctx context.Context) (ee
[]*streamv1.Element,
result = append(result, element)
}
}
+ if span != nil {
+ span.Tagf("response_count", "%d", responseCount)
+ span.Tagf("element_id_count", "%d", len(seen))
+ }
return result, allErr
}
diff --git a/pkg/query/logical/trace/trace_plan_distributed.go
b/pkg/query/logical/trace/trace_plan_distributed.go
index f9e8e960..bedc26ae 100644
--- a/pkg/query/logical/trace/trace_plan_distributed.go
+++ b/pkg/query/logical/trace/trace_plan_distributed.go
@@ -155,6 +155,7 @@ func (p *distributedPlan) Execute(ctx context.Context)
(iter.Iterator[model.Trac
return iter.Empty[model.TraceResult](), err
}
var allErr error
+ var responseCount int
var st []sort.Iterator[*comparableTrace]
for _, f := range ff {
if m, getErr := f.Get(); getErr != nil {
@@ -165,6 +166,7 @@ func (p *distributedPlan) Execute(ctx context.Context)
(iter.Iterator[model.Trac
continue
}
resp := d.(*tracev1.InternalQueryResponse)
+ responseCount++
if span != nil {
span.AddSubTrace(resp.TraceQueryResult)
}
@@ -172,6 +174,9 @@ func (p *distributedPlan) Execute(ctx context.Context)
(iter.Iterator[model.Trac
newSortableTraces(resp.InternalTraces,
p.sortByTraceID))
}
}
+ if span != nil {
+ span.Tagf("response_count", "%d", responseCount)
+ }
sortIter := sort.NewItemIter(st, p.desc)
var result []*tracev1.InternalTrace
seen := make(map[string]*tracev1.InternalTrace)
@@ -195,6 +200,9 @@ func (p *distributedPlan) Execute(ctx context.Context)
(iter.Iterator[model.Trac
}
}
}
+ if span != nil {
+ span.Tagf("trace_id_count", "%d", len(seen))
+ }
return &distributedTraceResultIterator{
traces: result,