This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug/empty-trace in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 788ee13a4e59716e864b8fd106e2ed272330bcb1 Author: Gao Hongtao <[email protected]> AuthorDate: Wed Nov 19 22:45:31 2025 +0000 Add sidx command for dumping secondary index data - Introduced a new command `sidx` in the dump tool to facilitate the dumping of secondary index data with various filtering options. - Implemented functionality to handle time range filtering, criteria-based filtering, and projection of specific tags. - Enhanced the command with CSV output support for better data representation. - Updated the main command to include the new `sidx` command alongside existing commands like `trace`. --- banyand/cmd/dump/main.go | 1 + banyand/cmd/dump/sidx.go | 1084 ++++++++++++++++++++++++++++++ banyand/cmd/dump/trace.go | 784 ++++++++++++++++++++- banyand/internal/sidx/block.go | 6 +- banyand/internal/sidx/interfaces.go | 35 + banyand/internal/sidx/merge_test.go | 140 ++++ banyand/internal/sidx/part_iter.go | 120 +--- banyand/internal/sidx/part_iter_test.go | 332 +++------ banyand/internal/sidx/scan_query.go | 205 ++++++ banyand/internal/sidx/sidx.go | 29 + banyand/internal/sidx/tag.go | 10 +- banyand/trace/streaming_pipeline_test.go | 7 + 12 files changed, 2376 insertions(+), 377 deletions(-) diff --git a/banyand/cmd/dump/main.go b/banyand/cmd/dump/main.go index 5bb17eb5..29da173b 100644 --- a/banyand/cmd/dump/main.go +++ b/banyand/cmd/dump/main.go @@ -37,6 +37,7 @@ It provides subcommands for different data types (trace, stream, measure, etc.). } rootCmd.AddCommand(newTraceCmd()) + rootCmd.AddCommand(newSidxCmd()) if err := rootCmd.Execute(); err != nil { fmt.Fprintln(os.Stderr, err) diff --git a/banyand/cmd/dump/sidx.go b/banyand/cmd/dump/sidx.go new file mode 100644 index 00000000..433a6096 --- /dev/null +++ b/banyand/cmd/dump/sidx.go @@ -0,0 +1,1084 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "encoding/csv" + "encoding/json" + "fmt" + "math" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + "github.com/spf13/cobra" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/sidx" + "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" + "github.com/apache/skywalking-banyandb/pkg/query/model" +) + +func newSidxCmd() *cobra.Command { + var sidxPath string + var segmentPath string + var criteriaJSON string + var csvOutput bool + var timeBegin string + var timeEnd string + var fullScan bool + var projectionTags string + var dataFilter string + + cmd := &cobra.Command{ + Use: "sidx", + Short: "Dump sidx (secondary index) data", + Long: `Dump and display contents of a sidx directory with filtering. +Query results include which part each row belongs to. + +The tool automatically discovers series IDs from the segment's series index.`, + Example: ` # Display sidx data in text format + dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path /path/to/segment + + # Query with time range filter + dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path /path/to/segment \ + --time-begin "2025-11-23T05:05:00Z" --time-end "2025-11-23T05:20:00Z" + + # Query with criteria filter + dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path /path/to/segment \ + -criteria '{"condition":{"name":"query","op":"BINARY_OP_HAVING","value":{"strArray":{"value":["val1","val2"]}}}}' + + # Full scan mode (scans all series without requiring series ID discovery) + dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path /path/to/segment --full-scan + + # Project specific tags as columns + dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path /path/to/segment \ + --projection "tag1,tag2,tag3" + + # Filter by data content + dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path /path/to/segment \ + --data-filter "2dd1624d5719fdfedc526f3f24d00342-4423100" + + # Output as CSV + dump sidx -sidx-path /path/to/sidx/timestamp_millis -segment-path /path/to/segment -csv`, + RunE: func(_ *cobra.Command, _ []string) error { + if sidxPath == "" { + return fmt.Errorf("-sidx-path flag is required") + } + if segmentPath == "" { + return fmt.Errorf("-segment-path flag is required") + } + return dumpSidx(sidxPath, segmentPath, criteriaJSON, csvOutput, timeBegin, timeEnd, fullScan, projectionTags, dataFilter) + }, + } + + cmd.Flags().StringVarP(&sidxPath, "sidx-path", "s", "", "Path to the sidx directory (required)") + cmd.Flags().StringVarP(&segmentPath, "segment-path", "g", "", "Path to the segment directory (required)") + cmd.Flags().StringVarP(&criteriaJSON, "criteria", "c", "", "Criteria filter as JSON string") + cmd.Flags().BoolVar(&csvOutput, "csv", false, "Output as CSV format") + cmd.Flags().StringVar(&timeBegin, "time-begin", "", "Begin time in RFC3339 format (e.g., 2025-11-23T05:05:00Z)") + cmd.Flags().StringVar(&timeEnd, "time-end", "", "End time in RFC3339 format (e.g., 2025-11-23T05:20:00Z)") + cmd.Flags().BoolVar(&fullScan, "full-scan", false, "Scan all series without requiring series ID discovery") + cmd.Flags().StringVarP(&projectionTags, "projection", "p", "", "Comma-separated list of tags to include as columns (e.g., tag1,tag2,tag3)") + cmd.Flags().StringVarP(&dataFilter, "data-filter", "d", "", "Filter rows by data content (substring match)") + _ = cmd.MarkFlagRequired("sidx-path") + _ = cmd.MarkFlagRequired("segment-path") + + return cmd +} + +func dumpSidx(sidxPath, segmentPath, criteriaJSON string, csvOutput bool, timeBegin, timeEnd string, fullScan bool, projectionTags string, dataFilter string) error { + // Parse time range if provided + var minKeyNanos, maxKeyNanos int64 + var hasTimeRange bool + + if timeBegin != "" || timeEnd != "" { + var err error + minKeyNanos, maxKeyNanos, err = parseTimeRange(timeBegin, timeEnd) + if err != nil { + return fmt.Errorf("failed to parse time range: %w", err) + } + hasTimeRange = true + fmt.Fprintf(os.Stderr, "Time range filter: %s to %s\n", + time.Unix(0, minKeyNanos).Format(time.RFC3339), + time.Unix(0, maxKeyNanos).Format(time.RFC3339)) + } + + // Parse projection tags + var tagNames []string + if projectionTags != "" { + tagNames = parseProjectionTags(projectionTags) + fmt.Fprintf(os.Stderr, "Projection tags: %v\n", tagNames) + } + + // Log data filter if provided + if dataFilter != "" { + fmt.Fprintf(os.Stderr, "Data filter: %s\n", dataFilter) + } + + // Discover available part IDs from segment directory + allPartIDs, err := discoverPartIDs(segmentPath) + if err != nil { + return fmt.Errorf("failed to discover part IDs: %w", err) + } + + if len(allPartIDs) == 0 { + fmt.Println("No parts found in segment directory") + return nil + } + + // Filter parts by time range if specified + var partIDs []uint64 + if hasTimeRange { + partIDs, err = filterPartsByTimeRange(sidxPath, allPartIDs, minKeyNanos, maxKeyNanos) + if err != nil { + return fmt.Errorf("failed to filter parts by time range: %w", err) + } + fmt.Fprintf(os.Stderr, "Filtered to %d parts (out of %d) based on time range\n", len(partIDs), len(allPartIDs)) + } else { + partIDs = allPartIDs + } + + if len(partIDs) == 0 { + fmt.Println("No parts match the specified time range") + return nil + } + + // Parse criteria if provided + var criteria *modelv1.Criteria + if criteriaJSON != "" { + criteria, err = parseCriteriaJSON(criteriaJSON) + if err != nil { + return fmt.Errorf("failed to parse criteria: %w", err) + } + } + + // Use full-scan mode if requested + if fullScan { + return dumpSidxFullScan(sidxPath, segmentPath, criteria, csvOutput, hasTimeRange, minKeyNanos, maxKeyNanos, allPartIDs, partIDs, tagNames, dataFilter) + } + + // Open the file system + fileSystem := fs.NewLocalFileSystem() + + // Create sidx options + opts, err := sidx.NewOptions(sidxPath, &protector.Nop{}) + if err != nil { + return fmt.Errorf("failed to create sidx options: %w", err) + } + opts.AvailablePartIDs = partIDs + + // Open SIDX instance + sidxInstance, err := sidx.NewSIDX(fileSystem, opts) + if err != nil { + return fmt.Errorf("failed to open sidx: %w", err) + } + defer sidxInstance.Close() + + // Discover series IDs from the segment's series index + seriesIDs, err := discoverSeriesIDs(segmentPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: could not discover series IDs: %v. Using default range.\n", err) + seriesIDs = nil + } + if len(seriesIDs) > 0 { + fmt.Fprintf(os.Stderr, "Discovered %d series IDs from segment\n", len(seriesIDs)) + } + + // Create dynamic tag registry if criteria is provided + var tagRegistry *dynamicTagRegistry + if criteria != nil { + tagRegistry, err = newDynamicTagRegistry(sidxPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Failed to create tag registry: %v. Criteria filtering may not work properly.\n", err) + } else { + fmt.Fprintf(os.Stderr, "Discovered %d tags from sidx\n", len(tagRegistry.tagSpecs)) + } + } + + // Build query request + req, err := buildQueryRequest(criteria, seriesIDs, hasTimeRange, minKeyNanos, maxKeyNanos, tagRegistry) + if err != nil { + return fmt.Errorf("failed to build query request: %w", err) + } + + // Execute streaming query + // Use a longer timeout for dump operations since they can process large datasets + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + resultsCh, errCh := sidxInstance.StreamingQuery(ctx, req) + + // Process and output results + if csvOutput { + return outputSidxResultsAsCSV(resultsCh, errCh, dataFilter) + } + return outputSidxResultsAsText(resultsCh, errCh, sidxPath, dataFilter) +} + +func parseTimeRange(timeBegin, timeEnd string) (int64, int64, error) { + var minKeyNanos, maxKeyNanos int64 + + if timeBegin != "" { + t, err := time.Parse(time.RFC3339, timeBegin) + if err != nil { + return 0, 0, fmt.Errorf("invalid time-begin format: %w (expected RFC3339, e.g., 2025-11-23T05:05:00Z)", err) + } + minKeyNanos = t.UnixNano() + } else { + minKeyNanos = math.MinInt64 + } + + if timeEnd != "" { + t, err := time.Parse(time.RFC3339, timeEnd) + if err != nil { + return 0, 0, fmt.Errorf("invalid time-end format: %w (expected RFC3339, e.g., 2025-11-23T05:20:00Z)", err) + } + maxKeyNanos = t.UnixNano() + } else { + maxKeyNanos = math.MaxInt64 + } + + if minKeyNanos > maxKeyNanos { + return 0, 0, fmt.Errorf("time-begin must be before time-end") + } + + return minKeyNanos, maxKeyNanos, nil +} + +func filterPartsByTimeRange(sidxPath string, partIDs []uint64, minKey, maxKey int64) ([]uint64, error) { + fileSystem := fs.NewLocalFileSystem() + var filteredParts []uint64 + + for _, partID := range partIDs { + partPath := filepath.Join(sidxPath, fmt.Sprintf("%016x", partID)) + manifestPath := filepath.Join(partPath, "manifest.json") + + // Read manifest.json + manifestData, err := fileSystem.Read(manifestPath) + if err != nil { + // Skip parts that don't have a manifest + continue + } + + // Parse manifest to get minKey and maxKey + var manifest struct { + MinKey int64 `json:"minKey"` + MaxKey int64 `json:"maxKey"` + } + + if err := json.Unmarshal(manifestData, &manifest); err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to parse manifest for part %016x: %v\n", partID, err) + continue + } + + // Check if part overlaps with the requested time range + if manifest.MaxKey >= minKey && manifest.MinKey <= maxKey { + filteredParts = append(filteredParts, partID) + } + } + + return filteredParts, nil +} + +func parseCriteriaJSON(criteriaJSON string) (*modelv1.Criteria, error) { + criteria := &modelv1.Criteria{} + err := protojson.Unmarshal([]byte(criteriaJSON), criteria) + if err != nil { + return nil, fmt.Errorf("invalid criteria JSON: %w", err) + } + return criteria, nil +} + +func discoverPartIDs(segmentPath string) ([]uint64, error) { + // Look for shard directories in the segment path + shardDirs, err := filepath.Glob(filepath.Join(segmentPath, "shard-*")) + if err != nil { + return nil, fmt.Errorf("failed to glob shard directories: %w", err) + } + + if len(shardDirs) == 0 { + return nil, fmt.Errorf("no shard directories found in segment path") + } + + // Collect all part IDs from all shards + partIDMap := make(map[uint64]bool) + + for _, shardDir := range shardDirs { + entries, err := os.ReadDir(shardDir) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to read shard directory %s: %v\n", shardDir, err) + continue + } + + for _, entry := range entries { + if !entry.IsDir() { + continue + } + // Part directories are named as hex IDs (e.g., "0000000000004db4") + name := entry.Name() + // Skip special directories + if name == "sidx" || name == "meta" { + continue + } + // Try to parse as hex + partID, err := strconv.ParseUint(name, 16, 64) + if err == nil { + partIDMap[partID] = true + } + } + } + + // Convert map to sorted slice + partIDs := make([]uint64, 0, len(partIDMap)) + for partID := range partIDMap { + partIDs = append(partIDs, partID) + } + sort.Slice(partIDs, func(i, j int) bool { + return partIDs[i] < partIDs[j] + }) + + return partIDs, nil +} + +func discoverSeriesIDs(segmentPath string) ([]common.SeriesID, error) { + // Open the series index (sidx directory under segment) + seriesIndexPath := filepath.Join(segmentPath, "sidx") + + l := logger.GetLogger("dump-sidx") + + // Create inverted index store + store, err := inverted.NewStore(inverted.StoreOpts{ + Path: seriesIndexPath, + Logger: l, + }) + if err != nil { + return nil, fmt.Errorf("failed to open series index: %w", err) + } + defer store.Close() + + // Get series iterator + ctx := context.Background() + iter, err := store.SeriesIterator(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create series iterator: %w", err) + } + defer iter.Close() + + // Collect all series IDs + var seriesIDs []common.SeriesID + for iter.Next() { + series := iter.Val() + // Compute series ID from EntityValues using hash + if len(series.EntityValues) > 0 { + seriesID := common.SeriesID(convert.Hash(series.EntityValues)) + seriesIDs = append(seriesIDs, seriesID) + } + } + + return seriesIDs, nil +} + +func buildQueryRequest(criteria *modelv1.Criteria, seriesIDs []common.SeriesID, hasTimeRange bool, minKeyNanos, maxKeyNanos int64, tagRegistry *dynamicTagRegistry) (sidx.QueryRequest, error) { + // Use discovered series IDs if available, otherwise fall back to default range + if len(seriesIDs) == 0 { + // Use a range of series IDs to increase chance of finding data + // Since we don't know what series IDs exist, we'll use a large range + // Series IDs are uint64 hash values, so they can be quite large + // We'll sample the space to cover likely values + seriesIDs = make([]common.SeriesID, 0, 10000) + + // Add first 10000 series IDs (for simple/test data) + for i := common.SeriesID(1); i <= 10000; i++ { + seriesIDs = append(seriesIDs, i) + } + } + + req := sidx.QueryRequest{ + SeriesIDs: seriesIDs, + MaxBatchSize: 100, // Smaller batch size for better responsiveness + } + + // Set min/max key based on time range if provided, otherwise full range + var minKey, maxKey int64 + if hasTimeRange { + minKey = minKeyNanos + maxKey = maxKeyNanos + } else { + minKey = math.MinInt64 + maxKey = math.MaxInt64 + } + req.MinKey = &minKey + req.MaxKey = &maxKey + + // If criteria is provided, build tag filter + if criteria != nil && tagRegistry != nil { + tagFilter, err := logical.BuildSimpleTagFilter(criteria) + if err != nil { + return req, fmt.Errorf("failed to build tag filter from criteria: %w", err) + } + + // Create a tag filter matcher if filter is not dummy + if tagFilter != nil && tagFilter != logical.DummyFilter { + // Create tag filter matcher with the dynamic registry and decoder + tagFilterMatcher := logical.NewTagFilterMatcher(tagFilter, tagRegistry, simpleTagValueDecoder) + req.TagFilter = tagFilterMatcher + fmt.Fprintf(os.Stderr, "Applied criteria filter: %v\n", tagFilter) + } + } + + return req, nil +} + +func outputSidxResultsAsText(resultsCh <-chan *sidx.QueryResponse, errCh <-chan error, sidxPath string, dataFilter string) error { + fmt.Printf("Opening sidx: %s\n", sidxPath) + fmt.Printf("================================================================================\n\n") + + totalRows := 0 + filteredRows := 0 + batchNum := 0 + + // Process results and errors + for resultsCh != nil || errCh != nil { + select { + case err, ok := <-errCh: + if !ok { + errCh = nil + continue + } + if err != nil { + return fmt.Errorf("query error: %w", err) + } + case resp, ok := <-resultsCh: + if !ok { + resultsCh = nil + continue + } + + if resp.Error != nil { + return fmt.Errorf("response error: %w", resp.Error) + } + + if resp.Len() > 0 { + batchNum++ + batchPrinted := false + + for i := 0; i < resp.Len(); i++ { + dataStr := string(resp.Data[i]) + + // Apply data filter if specified + if dataFilter != "" && !strings.Contains(dataStr, dataFilter) { + filteredRows++ + continue + } + + // Print batch header only when we have matching rows + if !batchPrinted { + fmt.Printf("Batch %d:\n", batchNum) + fmt.Printf("--------------------------------------------------------------------------------\n") + batchPrinted = true + } + + totalRows++ + fmt.Printf("Row %d:\n", totalRows) + fmt.Printf(" PartID: %d (0x%016x)\n", resp.PartIDs[i], resp.PartIDs[i]) + fmt.Printf(" Key: %d\n", resp.Keys[i]) + fmt.Printf(" SeriesID: %d\n", resp.SIDs[i]) + fmt.Printf(" Data: %s\n", dataStr) + fmt.Printf("\n") + } + } + } + } + + if dataFilter != "" { + fmt.Printf("\nTotal rows: %d (filtered out: %d)\n", totalRows, filteredRows) + } else { + fmt.Printf("\nTotal rows: %d\n", totalRows) + } + return nil +} + +func outputSidxResultsAsCSV(resultsCh <-chan *sidx.QueryResponse, errCh <-chan error, dataFilter string) error { + writer := csv.NewWriter(os.Stdout) + defer writer.Flush() + + // Write header + header := []string{"PartID", "Key", "SeriesID", "Data"} + if err := writer.Write(header); err != nil { + return fmt.Errorf("failed to write CSV header: %w", err) + } + + // Process results and errors + for resultsCh != nil || errCh != nil { + select { + case err, ok := <-errCh: + if !ok { + errCh = nil + continue + } + if err != nil { + return fmt.Errorf("query error: %w", err) + } + case resp, ok := <-resultsCh: + if !ok { + resultsCh = nil + continue + } + + if resp.Error != nil { + return fmt.Errorf("response error: %w", resp.Error) + } + + for i := 0; i < resp.Len(); i++ { + dataStr := string(resp.Data[i]) + + // Apply data filter if specified + if dataFilter != "" && !strings.Contains(dataStr, dataFilter) { + continue + } + + row := []string{ + fmt.Sprintf("%d", resp.PartIDs[i]), + fmt.Sprintf("%d", resp.Keys[i]), + fmt.Sprintf("%d", resp.SIDs[i]), + dataStr, + } + if err := writer.Write(row); err != nil { + return fmt.Errorf("failed to write CSV row: %w", err) + } + } + } + } + + return nil +} + +// simpleTagValueDecoder is a simple decoder for tag values used in filtering +// It handles basic tag value types without needing full schema information +func simpleTagValueDecoder(valueType pbv1.ValueType, value []byte, valueArr [][]byte) *modelv1.TagValue { + if value == nil && valueArr == nil { + return pbv1.NullTagValue + } + + switch valueType { + case pbv1.ValueTypeStr: + if value == nil { + return pbv1.NullTagValue + } + return &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: string(value), + }, + }, + } + case pbv1.ValueTypeInt64: + if value == nil { + return pbv1.NullTagValue + } + return &modelv1.TagValue{ + Value: &modelv1.TagValue_Int{ + Int: &modelv1.Int{ + Value: convert.BytesToInt64(value), + }, + }, + } + case pbv1.ValueTypeStrArr: + var values []string + for _, v := range valueArr { + values = append(values, string(v)) + } + return &modelv1.TagValue{ + Value: &modelv1.TagValue_StrArray{ + StrArray: &modelv1.StrArray{ + Value: values, + }, + }, + } + case pbv1.ValueTypeBinaryData: + if value == nil { + return pbv1.NullTagValue + } + return &modelv1.TagValue{ + Value: &modelv1.TagValue_BinaryData{ + BinaryData: value, + }, + } + default: + // For unknown types, try to return as string + if value != nil { + return &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: string(value), + }, + }, + } + } + return pbv1.NullTagValue + } +} + +// dynamicTagRegistry implements TagSpecRegistry by discovering tags from the sidx +type dynamicTagRegistry struct { + tagSpecs map[string]*logical.TagSpec +} + +// newDynamicTagRegistry creates a registry by discovering tag names from the sidx directory +func newDynamicTagRegistry(sidxPath string) (*dynamicTagRegistry, error) { + registry := &dynamicTagRegistry{ + tagSpecs: make(map[string]*logical.TagSpec), + } + + fileSystem := fs.NewLocalFileSystem() + + // List all directories in sidxPath to find parts + entries := fileSystem.ReadDir(sidxPath) + if len(entries) == 0 { + return nil, fmt.Errorf("failed to read sidx directory or directory is empty") + } + + tagNamesMap := make(map[string]bool) + + // Scan first part to discover tag names + for _, entry := range entries { + if !entry.IsDir() { + continue + } + + partPath := filepath.Join(sidxPath, entry.Name()) + partEntries := fileSystem.ReadDir(partPath) + + // Look for .td (tag data) files + for _, partEntry := range partEntries { + if partEntry.IsDir() { + continue + } + name := partEntry.Name() + // Tag data files are named like "tagname.td" + if strings.HasSuffix(name, ".td") { + tagName := strings.TrimSuffix(name, ".td") + tagNamesMap[tagName] = true + } + } + + // Only scan first part to save time + if len(tagNamesMap) > 0 { + break + } + } + + // Create tag specs for discovered tags + // We assume all tags are string arrays since we don't have schema info + tagFamilyIdx := 0 + tagIdx := 0 + for tagName := range tagNamesMap { + registry.tagSpecs[tagName] = &logical.TagSpec{ + Spec: &databasev1.TagSpec{ + Name: tagName, + Type: databasev1.TagType_TAG_TYPE_STRING_ARRAY, + }, + TagFamilyIdx: tagFamilyIdx, + TagIdx: tagIdx, + } + tagIdx++ + } + + return registry, nil +} + +func (d *dynamicTagRegistry) FindTagSpecByName(name string) *logical.TagSpec { + if spec, ok := d.tagSpecs[name]; ok { + return spec + } + // If tag not found, return a default spec for it + // This allows filtering on any tag name + return &logical.TagSpec{ + Spec: &databasev1.TagSpec{ + Name: name, + Type: databasev1.TagType_TAG_TYPE_STRING_ARRAY, + }, + TagFamilyIdx: 0, + TagIdx: 0, + } +} + +// IndexDefined implements IndexChecker interface (stub implementation) +func (d *dynamicTagRegistry) IndexDefined(_ string) (bool, *databasev1.IndexRule) { + return false, nil +} + +// IndexRuleDefined implements IndexChecker interface (stub implementation) +func (d *dynamicTagRegistry) IndexRuleDefined(_ string) (bool, *databasev1.IndexRule) { + return false, nil +} + +// EntityList implements Schema interface (stub implementation) +func (d *dynamicTagRegistry) EntityList() []string { + return nil +} + +// CreateTagRef implements Schema interface (stub implementation) +func (d *dynamicTagRegistry) CreateTagRef(tags ...[]*logical.Tag) ([][]*logical.TagRef, error) { + return nil, fmt.Errorf("CreateTagRef not supported in dump tool") +} + +// CreateFieldRef implements Schema interface (stub implementation) +func (d *dynamicTagRegistry) CreateFieldRef(fields ...*logical.Field) ([]*logical.FieldRef, error) { + return nil, fmt.Errorf("CreateFieldRef not supported in dump tool") +} + +// ProjTags implements Schema interface (stub implementation) +func (d *dynamicTagRegistry) ProjTags(refs ...[]*logical.TagRef) logical.Schema { + return d +} + +// ProjFields implements Schema interface (stub implementation) +func (d *dynamicTagRegistry) ProjFields(refs ...*logical.FieldRef) logical.Schema { + return d +} + +// Children implements Schema interface (stub implementation) +func (d *dynamicTagRegistry) Children() []logical.Schema { + return nil +} + +func dumpSidxFullScan(sidxPath, segmentPath string, criteria *modelv1.Criteria, csvOutput bool, + hasTimeRange bool, minKeyNanos, maxKeyNanos int64, allPartIDs, partIDs []uint64, projectionTagNames []string, dataFilter string) error { + + fmt.Fprintf(os.Stderr, "Using full-scan mode (no series ID filtering)\n") + + // Load series information for human-readable output + seriesMap, err := loadSeriesMap(segmentPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Failed to load series information: %v\n", err) + seriesMap = nil // Continue without series names + } else { + fmt.Fprintf(os.Stderr, "Loaded %d series from segment\n", len(seriesMap)) + } + + // Build tag projection + var tagProjection []model.TagProjection + if len(projectionTagNames) > 0 { + // Create a single TagProjection with all the requested tag names + tagProjection = []model.TagProjection{ + { + Family: "", // Empty family for SIDX tags + Names: projectionTagNames, + }, + } + } + + // Create dynamic tag registry if criteria is provided + var tagRegistry *dynamicTagRegistry + if criteria != nil { + var err error + tagRegistry, err = newDynamicTagRegistry(sidxPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Failed to create tag registry: %v\n", err) + } + } + + // Build scan request + var minKey, maxKey int64 + if hasTimeRange { + minKey = minKeyNanos + maxKey = maxKeyNanos + } else { + minKey = math.MinInt64 + maxKey = math.MaxInt64 + } + + // Track progress + startTime := time.Now() + + scanReq := sidx.ScanQueryRequest{ + MinKey: &minKey, + MaxKey: &maxKey, + MaxBatchSize: 1000, + TagProjection: tagProjection, + OnProgress: func(currentPart, totalParts int, rowsFound int) { + elapsed := time.Since(startTime) + fmt.Fprintf(os.Stderr, "\rScanning part %d/%d... Found %d rows (elapsed: %s)", + currentPart, totalParts, rowsFound, elapsed.Round(time.Second)) + os.Stderr.Sync() // Flush immediately to show progress in real-time + }, + } + + // Apply criteria filter if provided + if criteria != nil && tagRegistry != nil { + fmt.Fprintf(os.Stderr, "Discovered %d tags from sidx\n", len(tagRegistry.tagSpecs)) + + tagFilter, err := logical.BuildSimpleTagFilter(criteria) + if err != nil { + return fmt.Errorf("failed to build tag filter: %w", err) + } + if tagFilter != nil && tagFilter != logical.DummyFilter { + scanReq.TagFilter = logical.NewTagFilterMatcher(tagFilter, tagRegistry, simpleTagValueDecoder) + fmt.Fprintf(os.Stderr, "Applied criteria filter: %v\n", tagFilter) + } + } + + // Open SIDX + fileSystem := fs.NewLocalFileSystem() + opts, err := sidx.NewOptions(sidxPath, &protector.Nop{}) + if err != nil { + return fmt.Errorf("failed to create sidx options: %w", err) + } + opts.AvailablePartIDs = partIDs + + sidxInstance, err := sidx.NewSIDX(fileSystem, opts) + if err != nil { + return fmt.Errorf("failed to open sidx: %w", err) + } + defer sidxInstance.Close() + + // Execute scan query + // Use a longer timeout for dump operations since they can process large datasets + // Full-scan mode can be slower when scanning many parts with filtering + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + results, err := sidxInstance.ScanQuery(ctx, scanReq) + if err != nil { + return fmt.Errorf("scan query failed: %w", err) + } + + // Print newline after progress output and flush to ensure clean output + fmt.Fprintf(os.Stderr, "\nScan complete.\n") + os.Stderr.Sync() // Ensure all stderr output is flushed + + // Output results + if csvOutput { + return outputScanResultsAsCSV(results, seriesMap, projectionTagNames, dataFilter) + } + return outputScanResultsAsText(results, sidxPath, seriesMap, projectionTagNames, dataFilter) +} + +// parseProjectionTags parses a comma-separated list of tag names +func parseProjectionTags(projectionStr string) []string { + if projectionStr == "" { + return nil + } + + tags := strings.Split(projectionStr, ",") + result := make([]string, 0, len(tags)) + for _, tag := range tags { + tag = strings.TrimSpace(tag) + if tag != "" { + result = append(result, tag) + } + } + return result +} + +// loadSeriesMap loads all series from the segment's series index and creates a map from SeriesID to text representation +func loadSeriesMap(segmentPath string) (map[common.SeriesID]string, error) { + seriesIndexPath := filepath.Join(segmentPath, "sidx") + + l := logger.GetLogger("dump-sidx") + + // Create inverted index store + store, err := inverted.NewStore(inverted.StoreOpts{ + Path: seriesIndexPath, + Logger: l, + }) + if err != nil { + return nil, fmt.Errorf("failed to open series index: %w", err) + } + defer store.Close() + + // Get series iterator + ctx := context.Background() + iter, err := store.SeriesIterator(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create series iterator: %w", err) + } + defer iter.Close() + + // Build map of SeriesID -> text representation + seriesMap := make(map[common.SeriesID]string) + for iter.Next() { + series := iter.Val() + if len(series.EntityValues) > 0 { + seriesID := common.SeriesID(convert.Hash(series.EntityValues)) + // Convert EntityValues bytes to readable string + seriesText := string(series.EntityValues) + seriesMap[seriesID] = seriesText + } + } + + return seriesMap, nil +} + +func outputScanResultsAsText(results []*sidx.QueryResponse, sidxPath string, seriesMap map[common.SeriesID]string, projectionTagNames []string, dataFilter string) error { + fmt.Printf("Opening sidx: %s\n", sidxPath) + fmt.Printf("================================================================================\n\n") + + totalRows := 0 + filteredRows := 0 + for batchNum, resp := range results { + if resp.Error != nil { + return fmt.Errorf("response error in batch %d: %w", batchNum+1, resp.Error) + } + + batchPrinted := false + batchStartRow := totalRows + + for i := 0; i < resp.Len(); i++ { + dataStr := string(resp.Data[i]) + + // Apply data filter if specified + if dataFilter != "" && !strings.Contains(dataStr, dataFilter) { + filteredRows++ + continue + } + + // Print batch header only when we have matching rows + if !batchPrinted { + fmt.Printf("Batch %d:\n", batchNum+1) + fmt.Printf("--------------------------------------------------------------------------------\n") + batchPrinted = true + } + + fmt.Printf("Row %d:\n", totalRows+1) + fmt.Printf(" PartID: %d (0x%016x)\n", resp.PartIDs[i], resp.PartIDs[i]) + fmt.Printf(" Key: %d\n", resp.Keys[i]) + fmt.Printf(" SeriesID: %d\n", resp.SIDs[i]) + + // Add series text if available + if seriesMap != nil { + if seriesText, ok := seriesMap[resp.SIDs[i]]; ok { + fmt.Printf(" Series: %s\n", seriesText) + } + } + + // Add projected tags if available + if len(projectionTagNames) > 0 && resp.Tags != nil { + for _, tagName := range projectionTagNames { + if tagValues, ok := resp.Tags[tagName]; ok && i < len(tagValues) { + tagValue := tagValues[i] + // Calculate size of the tag value + tagSize := len(tagValue) + fmt.Printf(" %s: %s (size: %d bytes)\n", tagName, tagValue, tagSize) + } + } + } + + fmt.Printf(" Data: %s\n", dataStr) + fmt.Printf("\n") + totalRows++ + } + + // Add newline after batch if we printed anything + if batchPrinted && totalRows > batchStartRow { + // Already added newline after each row + } + } + + if dataFilter != "" { + fmt.Printf("\nTotal rows: %d (filtered out: %d)\n", totalRows, filteredRows) + } else { + fmt.Printf("\nTotal rows: %d\n", totalRows) + } + return nil +} + +func outputScanResultsAsCSV(results []*sidx.QueryResponse, seriesMap map[common.SeriesID]string, projectionTagNames []string, dataFilter string) error { + writer := csv.NewWriter(os.Stdout) + defer writer.Flush() + + // Write header + header := []string{"PartID", "Key", "SeriesID", "Series"} + // Add projected tag columns (with size) + for _, tagName := range projectionTagNames { + header = append(header, tagName, tagName+"_size") + } + header = append(header, "Data") + + if err := writer.Write(header); err != nil { + return fmt.Errorf("failed to write CSV header: %w", err) + } + + for _, resp := range results { + if resp.Error != nil { + return fmt.Errorf("response error: %w", resp.Error) + } + + for i := 0; i < resp.Len(); i++ { + dataStr := string(resp.Data[i]) + + // Apply data filter if specified + if dataFilter != "" && !strings.Contains(dataStr, dataFilter) { + continue + } + + seriesText := "" + if seriesMap != nil { + if text, ok := seriesMap[resp.SIDs[i]]; ok { + seriesText = text + } + } + + row := []string{ + fmt.Sprintf("%d", resp.PartIDs[i]), + fmt.Sprintf("%d", resp.Keys[i]), + fmt.Sprintf("%d", resp.SIDs[i]), + seriesText, + } + + // Add projected tag values with size + for _, tagName := range projectionTagNames { + tagValue := "" + tagSize := "" + + if resp.Tags != nil { + if tagValues, ok := resp.Tags[tagName]; ok && i < len(tagValues) { + tagValue = tagValues[i] + // Calculate size of the tag value + tagSize = fmt.Sprintf("%d", len(tagValue)) + } + } + + row = append(row, tagValue, tagSize) + } + + row = append(row, dataStr) + + if err := writer.Write(row); err != nil { + return fmt.Errorf("failed to write CSV row: %w", err) + } + } + } + + return nil +} diff --git a/banyand/cmd/dump/trace.go b/banyand/cmd/dump/trace.go index d779a658..02422e1a 100644 --- a/banyand/cmd/dump/trace.go +++ b/banyand/cmd/dump/trace.go @@ -18,6 +18,7 @@ package main import ( + "context" "encoding/csv" "encoding/json" "fmt" @@ -30,49 +31,76 @@ import ( "time" "github.com/spf13/cobra" + "google.golang.org/protobuf/encoding/protojson" + "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" ) func newTraceCmd() *cobra.Command { - var partPath string + var shardPath string + var segmentPath string var verbose bool var csvOutput bool + var criteriaJSON string + var projectionTags string cmd := &cobra.Command{ Use: "trace", - Short: "Dump trace part data", - Long: `Dump and display contents of a trace part directory. -Outputs trace data in human-readable format or CSV.`, - Example: ` # Display trace data in text format - dump trace -path /path/to/part/0000000000004db4 + Short: "Dump trace shard data", + Long: `Dump and display contents of a trace shard directory (containing multiple parts). +Outputs trace data in human-readable format or CSV. + +Supports filtering by criteria and projecting specific tags.`, + Example: ` # Display trace data from shard in text format + dump trace --shard-path /path/to/shard-0 --segment-path /path/to/segment # Display with verbose hex dumps - dump trace -path /path/to/part/0000000000004db4 -v + dump trace --shard-path /path/to/shard-0 --segment-path /path/to/segment -v + + # Filter by criteria + dump trace --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --criteria '{"condition":{"name":"query","op":"BINARY_OP_HAVING","value":{"strArray":{"value":["tag1=value1","tag2=value2"]}}}}' + + # Project specific tags + dump trace --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --projection "tag1,tag2,tag3" # Output as CSV - dump trace -path /path/to/part/0000000000004db4 -csv + dump trace --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv # Save CSV to file - dump trace -path /path/to/part/0000000000004db4 -csv > output.csv`, + dump trace --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv > output.csv`, RunE: func(_ *cobra.Command, _ []string) error { - if partPath == "" { - return fmt.Errorf("-path flag is required") + if shardPath == "" { + return fmt.Errorf("--shard-path flag is required") } - return dumpTracePart(partPath, verbose, csvOutput) + if segmentPath == "" { + return fmt.Errorf("--segment-path flag is required") + } + return dumpTraceShard(shardPath, segmentPath, verbose, csvOutput, criteriaJSON, projectionTags) }, } - cmd.Flags().StringVarP(&partPath, "path", "p", "", "Path to the trace part directory (required)") + cmd.Flags().StringVar(&shardPath, "shard-path", "", "Path to the shard directory (required)") + cmd.Flags().StringVarP(&segmentPath, "segment-path", "g", "", "Path to the segment directory (required)") cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output (show raw data)") - cmd.Flags().BoolVarP(&csvOutput, "csv", "c", false, "Output as CSV format") - _ = cmd.MarkFlagRequired("path") + cmd.Flags().BoolVar(&csvOutput, "csv", false, "Output as CSV format") + cmd.Flags().StringVarP(&criteriaJSON, "criteria", "c", "", "Criteria filter as JSON string") + cmd.Flags().StringVarP(&projectionTags, "projection", "p", "", "Comma-separated list of tags to include as columns (e.g., tag1,tag2,tag3)") + _ = cmd.MarkFlagRequired("shard-path") + _ = cmd.MarkFlagRequired("segment-path") return cmd } @@ -247,6 +275,196 @@ func dumpTracePart(partPath string, verbose bool, csvOutput bool) error { return nil } +func dumpTraceShard(shardPath, segmentPath string, verbose bool, csvOutput bool, criteriaJSON, projectionTagsStr string) error { + // Discover all part directories in the shard + partIDs, err := discoverTracePartIDs(shardPath) + if err != nil { + return fmt.Errorf("failed to discover part IDs: %w", err) + } + + if len(partIDs) == 0 { + fmt.Println("No parts found in shard directory") + return nil + } + + fmt.Fprintf(os.Stderr, "Found %d parts in shard\n", len(partIDs)) + + // Load series information for human-readable output + seriesMap, err := loadTraceSeriesMap(segmentPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Failed to load series information: %v\n", err) + seriesMap = nil // Continue without series names + } else { + fmt.Fprintf(os.Stderr, "Loaded %d series from segment\n", len(seriesMap)) + } + + // Parse criteria if provided + var criteria *modelv1.Criteria + var tagFilter logical.TagFilter + if criteriaJSON != "" { + criteria, err = parseTraceCriteriaJSON(criteriaJSON) + if err != nil { + return fmt.Errorf("failed to parse criteria: %w", err) + } + tagFilter, err = logical.BuildSimpleTagFilter(criteria) + if err != nil { + return fmt.Errorf("failed to build tag filter: %w", err) + } + fmt.Fprintf(os.Stderr, "Applied criteria filter\n") + } + + // Parse projection tags + var projectionTags []string + if projectionTagsStr != "" { + projectionTags = parseTraceProjectionTags(projectionTagsStr) + fmt.Fprintf(os.Stderr, "Projection tags: %v\n", projectionTags) + } + + // Open the file system + fileSystem := fs.NewLocalFileSystem() + + // Determine tag columns for CSV output + var tagColumns []string + if csvOutput { + if len(projectionTags) > 0 { + tagColumns = projectionTags + } else { + // Scan first part to determine all available tags + tagColumns, err = discoverTagColumns(partIDs, shardPath, fileSystem) + if err != nil { + return fmt.Errorf("failed to discover tag columns: %w", err) + } + } + } + + // Initialize output + var writer *csv.Writer + var rowNum int + if csvOutput { + writer = csv.NewWriter(os.Stdout) + defer writer.Flush() + + // Write CSV header + header := []string{"PartID", "TraceID", "SpanID", "SeriesID", "Series", "SpanDataSize"} + header = append(header, tagColumns...) + if err := writer.Write(header); err != nil { + return fmt.Errorf("failed to write CSV header: %w", err) + } + } else { + fmt.Printf("================================================================================\n") + fmt.Fprintf(os.Stderr, "Processing parts...\n") + } + + // Process each part and stream output + for partIdx, partID := range partIDs { + fmt.Fprintf(os.Stderr, "Processing part %d/%d (0x%016x)...\n", partIdx+1, len(partIDs), partID) + + p, err := openFilePart(partID, shardPath, fileSystem) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to open part %016x: %v\n", partID, err) + continue + } + + decoder := &encoding.BytesBlockDecoder{} + partRowCount := 0 + + // Process all blocks in this part + for _, pbm := range p.primaryBlockMetadata { + // Read primary data block + primaryData := make([]byte, pbm.size) + fs.MustReadData(p.primary, int64(pbm.offset), primaryData) + + // Decompress + decompressed, err := zstd.Decompress(nil, primaryData) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error decompressing primary data in part %016x: %v\n", partID, err) + continue + } + + // Parse ALL block metadata entries from this primary block + blockMetadatas, err := parseAllBlockMetadata(decompressed, p.tagType) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error parsing block metadata in part %016x: %v\n", partID, err) + continue + } + + // Process each trace block within this primary block + for _, bm := range blockMetadatas { + // Read spans + spans, spanIDs, err := readSpans(decoder, bm.spans, int(bm.count), p.spans) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error reading spans for trace %s in part %016x: %v\n", bm.traceID, partID, err) + continue + } + + // Read tags + tags := make(map[string][][]byte) + for tagName, tagBlock := range bm.tags { + tagValues, err := readTagValues(decoder, tagBlock, tagName, int(bm.count), p.tagMetadata[tagName], p.tags[tagName], p.tagType[tagName]) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error reading tag %s for trace %s in part %016x: %v\n", tagName, bm.traceID, partID, err) + continue + } + tags[tagName] = tagValues + } + + // Process each span as a row + for i := 0; i < len(spans); i++ { + // Build tag map for this span + spanTags := make(map[string][]byte) + for tagName, tagValues := range tags { + if i < len(tagValues) { + spanTags[tagName] = tagValues[i] + } + } + + // Calculate series ID from entity tags + seriesID := calculateSeriesIDFromTags(spanTags) + + // Apply criteria filter if specified + if tagFilter != nil && tagFilter != logical.DummyFilter { + if !matchesCriteria(spanTags, p.tagType, tagFilter) { + continue + } + } + + row := traceRowData{ + partID: partID, + traceID: bm.traceID, + spanID: spanIDs[i], + spanData: spans[i], + tags: spanTags, + seriesID: seriesID, + } + + // Stream output immediately + if csvOutput { + if err := writeTraceRowAsCSV(writer, row, tagColumns, seriesMap); err != nil { + return err + } + } else { + writeTraceRowAsText(row, rowNum+1, verbose, projectionTags, seriesMap) + } + + rowNum++ + partRowCount++ + } + } + } + + closePart(p) + fmt.Fprintf(os.Stderr, " Part %d/%d: processed %d rows (total: %d)\n", partIdx+1, len(partIDs), partRowCount, rowNum) + } + + if !csvOutput { + fmt.Printf("\nTotal rows: %d\n", rowNum) + } else { + fmt.Fprintf(os.Stderr, "Total rows written: %d\n", rowNum) + } + + return nil +} + func dumpPartAsCSV(p *part) error { decoder := &encoding.BytesBlockDecoder{} @@ -561,6 +779,15 @@ type part struct { partMetadata partMetadata } +type traceRowData struct { + partID uint64 + traceID string + spanID string + spanData []byte + tags map[string][]byte + seriesID common.SeriesID +} + func openFilePart(id uint64, root string, fileSystem fs.FileSystem) (*part, error) { var p part partPath := filepath.Join(root, fmt.Sprintf("%016x", id)) @@ -877,3 +1104,530 @@ func unmarshalTagMetadata(tm *tagMetadata, src []byte) error { return nil } + +// Helper functions for new shard-level dump + +func discoverTracePartIDs(shardPath string) ([]uint64, error) { + entries, err := os.ReadDir(shardPath) + if err != nil { + return nil, fmt.Errorf("failed to read shard directory: %w", err) + } + + var partIDs []uint64 + for _, entry := range entries { + if !entry.IsDir() { + continue + } + // Skip special directories + name := entry.Name() + if name == "sidx" || name == "meta" { + continue + } + // Try to parse as hex part ID + partID, err := strconv.ParseUint(name, 16, 64) + if err == nil { + partIDs = append(partIDs, partID) + } + } + + sort.Slice(partIDs, func(i, j int) bool { + return partIDs[i] < partIDs[j] + }) + + return partIDs, nil +} + +func loadTraceSeriesMap(segmentPath string) (map[common.SeriesID]string, error) { + seriesIndexPath := filepath.Join(segmentPath, "sidx") + + l := logger.GetLogger("dump-trace") + + // Create inverted index store + store, err := inverted.NewStore(inverted.StoreOpts{ + Path: seriesIndexPath, + Logger: l, + }) + if err != nil { + return nil, fmt.Errorf("failed to open series index: %w", err) + } + defer store.Close() + + // Get series iterator + ctx := context.Background() + iter, err := store.SeriesIterator(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create series iterator: %w", err) + } + defer iter.Close() + + // Build map of SeriesID -> text representation + seriesMap := make(map[common.SeriesID]string) + for iter.Next() { + series := iter.Val() + if len(series.EntityValues) > 0 { + seriesID := common.SeriesID(convert.Hash(series.EntityValues)) + // Convert EntityValues bytes to readable string + seriesText := string(series.EntityValues) + seriesMap[seriesID] = seriesText + } + } + + return seriesMap, nil +} + +func parseTraceCriteriaJSON(criteriaJSON string) (*modelv1.Criteria, error) { + criteria := &modelv1.Criteria{} + err := protojson.Unmarshal([]byte(criteriaJSON), criteria) + if err != nil { + return nil, fmt.Errorf("invalid criteria JSON: %w", err) + } + return criteria, nil +} + +func parseTraceProjectionTags(projectionStr string) []string { + if projectionStr == "" { + return nil + } + + tags := strings.Split(projectionStr, ",") + result := make([]string, 0, len(tags)) + for _, tag := range tags { + tag = strings.TrimSpace(tag) + if tag != "" { + result = append(result, tag) + } + } + return result +} + +func calculateSeriesIDFromTags(tags map[string][]byte) common.SeriesID { + // Extract entity values from tags to calculate series ID + // Entity tags typically follow naming conventions like "service_id", "instance_id", etc. + // We'll collect all tag key-value pairs and hash them + var entityValues []byte + + // Sort tag names for consistent hashing + tagNames := make([]string, 0, len(tags)) + for name := range tags { + tagNames = append(tagNames, name) + } + sort.Strings(tagNames) + + // Build entity values + for _, name := range tagNames { + value := tags[name] + if value != nil { + // Append tag name=value + entityValues = append(entityValues, []byte(name)...) + entityValues = append(entityValues, '=') + entityValues = append(entityValues, value...) + entityValues = append(entityValues, internalencoding.EntityDelimiter) + } + } + + if len(entityValues) == 0 { + return 0 + } + + return common.SeriesID(convert.Hash(entityValues)) +} + +func matchesCriteria(tags map[string][]byte, tagTypes map[string]pbv1.ValueType, filter logical.TagFilter) bool { + // Convert tags to modelv1.Tag format + modelTags := make([]*modelv1.Tag, 0, len(tags)) + for name, value := range tags { + if value == nil { + continue + } + + valueType := tagTypes[name] + tagValue := convertTagValue(value, valueType) + if tagValue != nil { + modelTags = append(modelTags, &modelv1.Tag{ + Key: name, + Value: tagValue, + }) + } + } + + // Use TagFilterMatcher to check if tags match the filter + // Create a simple registry for the available tags + registry := &traceTagRegistry{ + tagTypes: tagTypes, + } + + matcher := logical.NewTagFilterMatcher(filter, registry, traceTagValueDecoder) + match, _ := matcher.Match(modelTags) + return match +} + +func convertTagValue(value []byte, valueType pbv1.ValueType) *modelv1.TagValue { + if value == nil { + return pbv1.NullTagValue + } + + switch valueType { + case pbv1.ValueTypeStr: + return &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: string(value), + }, + }, + } + case pbv1.ValueTypeInt64: + if len(value) >= 8 { + return &modelv1.TagValue{ + Value: &modelv1.TagValue_Int{ + Int: &modelv1.Int{ + Value: convert.BytesToInt64(value), + }, + }, + } + } + case pbv1.ValueTypeStrArr: + // Decode string array + var values []string + var err error + remaining := value + for len(remaining) > 0 { + var decoded []byte + decoded, remaining, err = unmarshalVarArray(nil, remaining) + if err != nil { + break + } + if len(decoded) > 0 { + values = append(values, string(decoded)) + } + } + return &modelv1.TagValue{ + Value: &modelv1.TagValue_StrArray{ + StrArray: &modelv1.StrArray{ + Value: values, + }, + }, + } + case pbv1.ValueTypeBinaryData: + return &modelv1.TagValue{ + Value: &modelv1.TagValue_BinaryData{ + BinaryData: value, + }, + } + } + + // Default: try to return as string + return &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: string(value), + }, + }, + } +} + +// traceTagRegistry implements logical.Schema for tag filtering +type traceTagRegistry struct { + tagTypes map[string]pbv1.ValueType +} + +func (r *traceTagRegistry) FindTagSpecByName(name string) *logical.TagSpec { + valueType := r.tagTypes[name] + var tagType databasev1.TagType + switch valueType { + case pbv1.ValueTypeStr: + tagType = databasev1.TagType_TAG_TYPE_STRING + case pbv1.ValueTypeInt64: + tagType = databasev1.TagType_TAG_TYPE_INT + case pbv1.ValueTypeStrArr: + tagType = databasev1.TagType_TAG_TYPE_STRING_ARRAY + default: + tagType = databasev1.TagType_TAG_TYPE_STRING + } + + return &logical.TagSpec{ + Spec: &databasev1.TagSpec{ + Name: name, + Type: tagType, + }, + TagFamilyIdx: 0, + TagIdx: 0, + } +} + +func (r *traceTagRegistry) IndexDefined(_ string) (bool, *databasev1.IndexRule) { + return false, nil +} + +func (r *traceTagRegistry) IndexRuleDefined(_ string) (bool, *databasev1.IndexRule) { + return false, nil +} + +func (r *traceTagRegistry) EntityList() []string { + return nil +} + +func (r *traceTagRegistry) CreateTagRef(tags ...[]*logical.Tag) ([][]*logical.TagRef, error) { + return nil, fmt.Errorf("CreateTagRef not supported in dump tool") +} + +func (r *traceTagRegistry) CreateFieldRef(fields ...*logical.Field) ([]*logical.FieldRef, error) { + return nil, fmt.Errorf("CreateFieldRef not supported in dump tool") +} + +func (r *traceTagRegistry) ProjTags(refs ...[]*logical.TagRef) logical.Schema { + return r +} + +func (r *traceTagRegistry) ProjFields(refs ...*logical.FieldRef) logical.Schema { + return r +} + +func (r *traceTagRegistry) Children() []logical.Schema { + return nil +} + +func traceTagValueDecoder(valueType pbv1.ValueType, value []byte, valueArr [][]byte) *modelv1.TagValue { + if value == nil && valueArr == nil { + return pbv1.NullTagValue + } + + switch valueType { + case pbv1.ValueTypeStr: + if value == nil { + return pbv1.NullTagValue + } + return &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: string(value), + }, + }, + } + case pbv1.ValueTypeInt64: + if value == nil { + return pbv1.NullTagValue + } + return &modelv1.TagValue{ + Value: &modelv1.TagValue_Int{ + Int: &modelv1.Int{ + Value: convert.BytesToInt64(value), + }, + }, + } + case pbv1.ValueTypeStrArr: + var values []string + for _, v := range valueArr { + values = append(values, string(v)) + } + return &modelv1.TagValue{ + Value: &modelv1.TagValue_StrArray{ + StrArray: &modelv1.StrArray{ + Value: values, + }, + }, + } + case pbv1.ValueTypeBinaryData: + if value == nil { + return pbv1.NullTagValue + } + return &modelv1.TagValue{ + Value: &modelv1.TagValue_BinaryData{ + BinaryData: value, + }, + } + default: + if value != nil { + return &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{ + Value: string(value), + }, + }, + } + } + return pbv1.NullTagValue + } +} + +func discoverTagColumns(partIDs []uint64, shardPath string, fileSystem fs.FileSystem) ([]string, error) { + if len(partIDs) == 0 { + return nil, nil + } + + // Open first part to discover tag columns + p, err := openFilePart(partIDs[0], shardPath, fileSystem) + if err != nil { + return nil, fmt.Errorf("failed to open first part: %w", err) + } + defer closePart(p) + + // Collect all tag names + tagNames := make([]string, 0, len(p.tagType)) + for name := range p.tagType { + tagNames = append(tagNames, name) + } + sort.Strings(tagNames) + + return tagNames, nil +} + +func writeTraceRowAsText(row traceRowData, rowNum int, verbose bool, projectionTags []string, seriesMap map[common.SeriesID]string) { + fmt.Printf("Row %d:\n", rowNum) + fmt.Printf(" PartID: %d (0x%016x)\n", row.partID, row.partID) + fmt.Printf(" TraceID: %s\n", row.traceID) + fmt.Printf(" SpanID: %s\n", row.spanID) + fmt.Printf(" SeriesID: %d\n", row.seriesID) + + // Add series text if available + if seriesMap != nil { + if seriesText, ok := seriesMap[row.seriesID]; ok { + fmt.Printf(" Series: %s\n", seriesText) + } + } + + fmt.Printf(" Span Data: %d bytes\n", len(row.spanData)) + if verbose { + fmt.Printf(" Span Content:\n") + printHexDump(row.spanData, 4) + } else { + if isPrintable(row.spanData) { + fmt.Printf(" Span: %s\n", string(row.spanData)) + } else { + fmt.Printf(" Span: (binary data, %d bytes)\n", len(row.spanData)) + } + } + + // Print projected tags or all tags + if len(row.tags) > 0 { + fmt.Printf(" Tags:\n") + + var tagsToShow []string + if len(projectionTags) > 0 { + tagsToShow = projectionTags + } else { + // Show all tags + for name := range row.tags { + tagsToShow = append(tagsToShow, name) + } + sort.Strings(tagsToShow) + } + + for _, name := range tagsToShow { + value, exists := row.tags[name] + if !exists { + continue + } + if value == nil { + fmt.Printf(" %s: <nil>\n", name) + } else { + fmt.Printf(" %s: %s\n", name, formatTagValueForDisplay(value, pbv1.ValueTypeStr)) + } + } + } + fmt.Printf("\n") +} + +func writeTraceRowAsCSV(writer *csv.Writer, row traceRowData, tagColumns []string, seriesMap map[common.SeriesID]string) error { + seriesText := "" + if seriesMap != nil { + if text, ok := seriesMap[row.seriesID]; ok { + seriesText = text + } + } + + csvRow := []string{ + fmt.Sprintf("%d", row.partID), + row.traceID, + row.spanID, + fmt.Sprintf("%d", row.seriesID), + seriesText, + strconv.Itoa(len(row.spanData)), + } + + // Add tag values + for _, tagName := range tagColumns { + value := "" + if tagValue, exists := row.tags[tagName]; exists && tagValue != nil { + value = string(tagValue) + } + csvRow = append(csvRow, value) + } + + return writer.Write(csvRow) +} + +func dumpTraceRowsAsText(rows []traceRowData, verbose bool, projectionTags []string, seriesMap map[common.SeriesID]string) error { + fmt.Printf("================================================================================\n") + fmt.Printf("Total rows: %d\n\n", len(rows)) + + for idx, row := range rows { + writeTraceRowAsText(row, idx+1, verbose, projectionTags, seriesMap) + } + + return nil +} + +func dumpTraceRowsAsCSV(rows []traceRowData, projectionTags []string, seriesMap map[common.SeriesID]string) error { + writer := csv.NewWriter(os.Stdout) + defer writer.Flush() + + // Determine which tags to include in CSV + var tagColumns []string + if len(projectionTags) > 0 { + tagColumns = projectionTags + } else { + // Collect all unique tag names + tagSet := make(map[string]bool) + for _, row := range rows { + for name := range row.tags { + tagSet[name] = true + } + } + for name := range tagSet { + tagColumns = append(tagColumns, name) + } + sort.Strings(tagColumns) + } + + // Write header + header := []string{"PartID", "TraceID", "SpanID", "SeriesID", "Series", "SpanDataSize"} + header = append(header, tagColumns...) + + if err := writer.Write(header); err != nil { + return fmt.Errorf("failed to write CSV header: %w", err) + } + + // Write rows + for _, row := range rows { + seriesText := "" + if seriesMap != nil { + if text, ok := seriesMap[row.seriesID]; ok { + seriesText = text + } + } + + csvRow := []string{ + fmt.Sprintf("%d", row.partID), + row.traceID, + row.spanID, + fmt.Sprintf("%d", row.seriesID), + seriesText, + strconv.Itoa(len(row.spanData)), + } + + // Add tag values + for _, tagName := range tagColumns { + value := "" + if tagValue, exists := row.tags[tagName]; exists && tagValue != nil { + value = string(tagValue) + } + csvRow = append(csvRow, value) + } + + if err := writer.Write(csvRow); err != nil { + return fmt.Errorf("failed to write CSV row: %w", err) + } + } + + return nil +} diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go index e136524e..21c8f7c9 100644 --- a/banyand/internal/sidx/block.go +++ b/banyand/internal/sidx/block.go @@ -120,7 +120,11 @@ func (b *block) processTag(tagName string, elementTags [][]*tag) { if tag.name == tagName { // Store structured tagRow instead of marshaled bytes if tag.valueArr != nil { - td.values[i].valueArr = tag.valueArr + td.values[i].valueArr = make([][]byte, len(tag.valueArr)) + for j, v := range tag.valueArr { + td.values[i].valueArr[j] = make([]byte, len(v)) + copy(td.values[i].valueArr[j], v) + } } else { td.values[i].value = tag.value } diff --git a/banyand/internal/sidx/interfaces.go b/banyand/internal/sidx/interfaces.go index cbe1e41b..7dc54bef 100644 --- a/banyand/internal/sidx/interfaces.go +++ b/banyand/internal/sidx/interfaces.go @@ -48,6 +48,11 @@ type SIDX interface { // The returned QueryResponse channel contains ordered batches limited by req.MaxBatchSize // unique Data elements (when positive). The error channel delivers any fatal execution error. StreamingQuery(ctx context.Context, req QueryRequest) (<-chan *QueryResponse, <-chan error) + // ScanQuery executes a synchronous full scan query without requiring series IDs. + // It scans all blocks in parts sequentially and applies filters to each row. + // Returns a slice of QueryResponse objects containing all matching results. + // This is a synchronous operation suitable for dump/debug tools. + ScanQuery(ctx context.Context, req ScanQueryRequest) ([]*QueryResponse, error) // Stats returns current system statistics and performance metrics. Stats(ctx context.Context) (*Stats, error) // Close gracefully shuts down the SIDX instance, ensuring all data is persisted. @@ -86,6 +91,24 @@ type QueryRequest struct { MaxBatchSize int } +// ScanQueryRequest specifies parameters for a full-scan query operation. +// Unlike QueryRequest, this does not require SeriesIDs and does not support Filter +// (all blocks are scanned, none are skipped). +// ScanProgressFunc is a callback for reporting scan progress. +// It receives the current part number (1-based), total parts, and rows found so far. +type ScanProgressFunc func(currentPart, totalParts int, rowsFound int) + +type ScanQueryRequest struct { + TagFilter model.TagFilterMatcher + MinKey *int64 + MaxKey *int64 + TagProjection []model.TagProjection + MaxBatchSize int // Max results per response batch + // OnProgress is an optional callback for progress reporting during scan. + // Called after processing each part with the current progress. + OnProgress ScanProgressFunc +} + // QueryResponse contains a batch of query results and execution metadata. // This follows BanyanDB result patterns with parallel arrays for efficiency. // Uses individual tag-based strategy (like trace module) rather than tag-family approach (like stream module). @@ -95,6 +118,7 @@ type QueryResponse struct { Data [][]byte SIDs []common.SeriesID PartIDs []uint64 + Tags map[string][]string // Projected tags: tag name -> values for each row Metadata ResponseMetadata } @@ -309,6 +333,17 @@ func (qr QueryRequest) Validate() error { return nil } +// Validate validates a ScanQueryRequest for correctness. +func (sqr ScanQueryRequest) Validate() error { + if sqr.MaxBatchSize < 0 { + return fmt.Errorf("maxBatchSize cannot be negative") + } + if sqr.MinKey != nil && sqr.MaxKey != nil && *sqr.MinKey > *sqr.MaxKey { + return fmt.Errorf("MinKey cannot be greater than MaxKey") + } + return nil +} + // Reset resets the QueryRequest to its zero state. func (qr *QueryRequest) Reset() { qr.SeriesIDs = nil diff --git a/banyand/internal/sidx/merge_test.go b/banyand/internal/sidx/merge_test.go index 0d63b5f4..92f8ac23 100644 --- a/banyand/internal/sidx/merge_test.go +++ b/banyand/internal/sidx/merge_test.go @@ -21,6 +21,7 @@ import ( "errors" "path/filepath" "reflect" + "sort" "testing" "github.com/google/go-cmp/cmp" @@ -281,6 +282,36 @@ var ( }) return es }() + + esStrArr1 = func() *elements { + es := generateElements() + es.mustAppend(1, 100, make([]byte, 100), []Tag{ + {Name: "arrTag", ValueArr: [][]byte{[]byte("value1"), []byte("value2")}, ValueType: pbv1.ValueTypeStrArr}, + }) + es.mustAppend(2, 200, make([]byte, 100), []Tag{ + {Name: "arrTag", ValueArr: [][]byte{[]byte("value3"), []byte("value4")}, ValueType: pbv1.ValueTypeStrArr}, + }) + return es + }() + + esStrArr2 = func() *elements { + es := generateElements() + es.mustAppend(1, 150, make([]byte, 100), []Tag{ + {Name: "arrTag", ValueArr: [][]byte{[]byte("value5"), []byte("value6")}, ValueType: pbv1.ValueTypeStrArr}, + }) + es.mustAppend(2, 250, make([]byte, 100), []Tag{ + {Name: "arrTag", ValueArr: [][]byte{[]byte("value7"), []byte("value8")}, ValueType: pbv1.ValueTypeStrArr}, + }) + return es + }() + + esStrArrWithEmpty = func() *elements { + es := generateElements() + es.mustAppend(1, 300, make([]byte, 100), []Tag{ + {Name: "arrTag", ValueArr: [][]byte{[]byte("a"), []byte(""), []byte("b")}, ValueType: pbv1.ValueTypeStrArr}, + }) + return es + }() ) func Test_mergeParts(t *testing.T) { @@ -331,10 +362,27 @@ func Test_mergeParts(t *testing.T) { {seriesID: 4, count: 1, uncompressedSize: 77}, }, }, + { + name: "Test with string array value type", + esList: []*elements{esStrArr1, esStrArr2}, + want: []blockMetadata{ + {seriesID: 1, count: 2, uncompressedSize: 264}, + {seriesID: 2, count: 2, uncompressedSize: 264}, + }, + }, + { + name: "Test with string array containing empty strings", + esList: []*elements{esStrArrWithEmpty}, + want: []blockMetadata{ + {seriesID: 1, count: 1, uncompressedSize: 128}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + wantBlocks := elementsToBlocks(tt.esList) + verify := func(t *testing.T, pp []*partWrapper, fileSystem fs.FileSystem, root string, partID uint64) { closeCh := make(chan struct{}) defer close(closeCh) @@ -356,8 +404,14 @@ func Test_mergeParts(t *testing.T) { reader := &blockReader{} reader.init([]*partMergeIter{pmi}) var got []blockMetadata + var gotBlocks []block + decoder := generateTagValuesDecoder() + defer releaseTagValuesDecoder(decoder) + for reader.nextBlockMetadata() { got = append(got, reader.block.bm) + reader.loadBlockData(decoder) + gotBlocks = append(gotBlocks, deepCopyBlock(&reader.block.block)) } require.NoError(t, reader.error()) @@ -373,6 +427,13 @@ func Test_mergeParts(t *testing.T) { ); diff != "" { t.Errorf("Unexpected blockMetadata (-got +want):\n%s", diff) } + + if diff := cmp.Diff(gotBlocks, wantBlocks, + cmpopts.IgnoreFields(tagData{}, "uniqueValues", "tmpBytes"), + cmp.AllowUnexported(block{}, tagData{}, tagRow{}), + ); diff != "" { + t.Errorf("Unexpected blocks (-got +want):\n%s", diff) + } } t.Run("memory parts", func(t *testing.T) { @@ -417,3 +478,82 @@ func Test_mergeParts(t *testing.T) { }) } } + +func elementsToBlocks(esList []*elements) []block { + merged := generateElements() + defer releaseElements(merged) + + for _, es := range esList { + for i := 0; i < len(es.seriesIDs); i++ { + var tags []Tag + for _, t := range es.tags[i] { + tags = append(tags, Tag{ + Name: t.name, + Value: t.value, + ValueArr: t.valueArr, + ValueType: t.valueType, + }) + } + merged.mustAppend(es.seriesIDs[i], es.userKeys[i], es.data[i], tags) + } + } + + sort.Sort(merged) + + var blocks []block + if merged.Len() == 0 { + return blocks + } + + start := 0 + for i := 1; i <= merged.Len(); i++ { + if i == merged.Len() || merged.seriesIDs[i] != merged.seriesIDs[start] { + b := block{ + tags: make(map[string]*tagData), + userKeys: make([]int64, i-start), + data: make([][]byte, i-start), + } + copy(b.userKeys, merged.userKeys[start:i]) + for k := 0; k < i-start; k++ { + b.data[k] = merged.data[start+k] + } + (&b).mustInitFromTags(merged.tags[start:i]) + blocks = append(blocks, b) + start = i + } + } + return blocks +} + +func deepCopyBlock(b *block) block { + newB := block{ + tags: make(map[string]*tagData), + } + newB.userKeys = append([]int64(nil), b.userKeys...) + newB.data = make([][]byte, len(b.data)) + for i, d := range b.data { + newB.data[i] = append([]byte(nil), d...) + } + for k, v := range b.tags { + newTd := &tagData{ + name: v.name, + valueType: v.valueType, + values: make([]tagRow, len(v.values)), + } + for i, row := range v.values { + newRow := tagRow{} + if row.value != nil { + newRow.value = append([]byte(nil), row.value...) + } + if row.valueArr != nil { + newRow.valueArr = make([][]byte, len(row.valueArr)) + for j, arrVal := range row.valueArr { + newRow.valueArr[j] = append([]byte(nil), arrVal...) + } + } + newTd.values[i] = newRow + } + newB.tags[k] = newTd + } + return newB +} diff --git a/banyand/internal/sidx/part_iter.go b/banyand/internal/sidx/part_iter.go index 39f3f47c..eb530e32 100644 --- a/banyand/internal/sidx/part_iter.go +++ b/banyand/internal/sidx/part_iter.go @@ -21,15 +21,11 @@ import ( "errors" "fmt" "io" - "sort" - "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/bytes" "github.com/apache/skywalking-banyandb/pkg/compress/zstd" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" - "github.com/apache/skywalking-banyandb/pkg/index" - "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/pool" ) @@ -37,13 +33,10 @@ type partIter struct { err error p *part curBlock *blockMetadata - sids []common.SeriesID - blockFilter index.Filter primaryBlockMetadata []primaryBlockMetadata bms []blockMetadata compressedPrimaryBuf []byte primaryBuf []byte - sidIdx int minKey int64 maxKey int64 } @@ -51,9 +44,6 @@ type partIter struct { func (pi *partIter) reset() { pi.curBlock = nil pi.p = nil - pi.sids = nil - pi.blockFilter = nil - pi.sidIdx = 0 pi.primaryBlockMetadata = nil pi.bms = nil pi.compressedPrimaryBuf = pi.compressedPrimaryBuf[:0] @@ -61,20 +51,14 @@ func (pi *partIter) reset() { pi.err = nil } -func (pi *partIter) init(bma *blockMetadataArray, p *part, sids []common.SeriesID, minKey, maxKey int64, blockFilter index.Filter) { +func (pi *partIter) init(bma *blockMetadataArray, p *part, minKey, maxKey int64) { pi.reset() pi.curBlock = &blockMetadata{} pi.p = p - pi.bms = bma.arr - pi.sids = sids - pi.blockFilter = blockFilter pi.minKey = minKey pi.maxKey = maxKey - pi.primaryBlockMetadata = p.primaryBlockMetadata - - pi.nextSeriesID() } func (pi *partIter) nextBlock() bool { @@ -100,52 +84,13 @@ func (pi *partIter) error() error { return pi.err } -func (pi *partIter) nextSeriesID() bool { - if pi.sidIdx >= len(pi.sids) { - pi.err = io.EOF - return false - } - pi.curBlock.seriesID = pi.sids[pi.sidIdx] - pi.sidIdx++ - return true -} - -func (pi *partIter) searchTargetSeriesID(sid common.SeriesID) bool { - if pi.curBlock.seriesID >= sid { - return true - } - if !pi.nextSeriesID() { - return false - } - if pi.curBlock.seriesID >= sid { - return true - } - sids := pi.sids[pi.sidIdx:] - pi.sidIdx += sort.Search(len(sids), func(i int) bool { - return sid <= sids[i] - }) - if pi.sidIdx >= len(pi.sids) { - pi.sidIdx = len(pi.sids) - pi.err = io.EOF - return false - } - pi.curBlock.seriesID = pi.sids[pi.sidIdx] - pi.sidIdx++ - return true -} - func (pi *partIter) loadNextBlockMetadata() bool { for len(pi.primaryBlockMetadata) > 0 { - if !pi.searchTargetSeriesID(pi.primaryBlockMetadata[0].seriesID) { - return false - } - pi.primaryBlockMetadata = searchPBM(pi.primaryBlockMetadata, pi.curBlock.seriesID) - pbm := &pi.primaryBlockMetadata[0] pi.primaryBlockMetadata = pi.primaryBlockMetadata[1:] - if pi.curBlock.seriesID < pbm.seriesID { - logger.Panicf("invariant violation: pi.curBlock.seriesID cannot be smaller than pbm.seriesID; got %+v vs %+v", &pi.curBlock.seriesID, &pbm.seriesID) - } + + // Process this block's series ID + pi.curBlock.seriesID = pbm.seriesID if pbm.maxKey < pi.minKey || pbm.minKey > pi.maxKey { continue @@ -164,25 +109,6 @@ func (pi *partIter) loadNextBlockMetadata() bool { return false } -func searchPBM(pbmIndex []primaryBlockMetadata, sid common.SeriesID) []primaryBlockMetadata { - if sid < pbmIndex[0].seriesID { - logger.Panicf("invariant violation: sid cannot be smaller than pbmIndex[0]; got %d vs %d", sid, &pbmIndex[0].seriesID) - } - - if sid == pbmIndex[0].seriesID { - return pbmIndex - } - - n := sort.Search(len(pbmIndex), func(i int) bool { - return sid <= pbmIndex[i].seriesID - }) - if n == 0 { - logger.Panicf("invariant violation: sort.Search returned 0 for sid > pbmIndex[0].seriesID; sid=%+v; pbmIndex[0].seriesID=%+v", - sid, &pbmIndex[0].seriesID) - } - return pbmIndex[n-1:] -} - func (pi *partIter) readPrimaryBlock(bms []blockMetadata, pbm *primaryBlockMetadata) ([]blockMetadata, error) { pi.compressedPrimaryBuf = bytes.ResizeOver(pi.compressedPrimaryBuf, int(pbm.size)) fs.MustReadData(pi.p.primary, int64(pbm.offset), pi.compressedPrimaryBuf) @@ -203,24 +129,10 @@ func (pi *partIter) readPrimaryBlock(bms []blockMetadata, pbm *primaryBlockMetad func (pi *partIter) findBlock() bool { bhs := pi.bms for len(bhs) > 0 { - sid := pi.curBlock.seriesID - if bhs[0].seriesID < sid { - n := sort.Search(len(bhs), func(i int) bool { - return sid <= bhs[i].seriesID - }) - if n == len(bhs) { - break - } - bhs = bhs[n:] - } bm := &bhs[0] - if bm.seriesID != sid { - if !pi.searchTargetSeriesID(bm.seriesID) { - return false - } - continue - } + // Process all blocks sequentially + pi.curBlock.seriesID = bm.seriesID if bm.maxKey < pi.minKey { bhs = bhs[1:] @@ -228,28 +140,10 @@ func (pi *partIter) findBlock() bool { } if bm.minKey > pi.maxKey { - if !pi.nextSeriesID() { - return false - } + bhs = bhs[1:] continue } - if pi.blockFilter != nil { - shouldSkip, err := func() (bool, error) { - tfo := generateTagFilterOp(bm, pi.p) - defer releaseTagFilterOp(tfo) - return pi.blockFilter.ShouldSkip(tfo) - }() - if err != nil { - pi.err = err - return false - } - if shouldSkip { - bhs = bhs[1:] - continue - } - } - pi.curBlock.copyFrom(bm) pi.bms = bhs[1:] diff --git a/banyand/internal/sidx/part_iter_test.go b/banyand/internal/sidx/part_iter_test.go index 4b1e2258..1cb00f65 100644 --- a/banyand/internal/sidx/part_iter_test.go +++ b/banyand/internal/sidx/part_iter_test.go @@ -37,7 +37,6 @@ func TestPartIterVerification(t *testing.T) { tests := []struct { name string elements []testElement - querySids []common.SeriesID minKey int64 maxKey int64 expectedLen int @@ -58,7 +57,6 @@ func TestPartIterVerification(t *testing.T) { }, }, }, - querySids: []common.SeriesID{1}, minKey: 50, maxKey: 150, expectedLen: 1, @@ -91,7 +89,6 @@ func TestPartIterVerification(t *testing.T) { }, }, }, - querySids: []common.SeriesID{1}, minKey: 50, maxKey: 250, expectedLen: 1, // Elements from same series are grouped into 1 block @@ -136,10 +133,9 @@ func TestPartIterVerification(t *testing.T) { }, }, }, - querySids: []common.SeriesID{1, 2, 3}, minKey: 50, maxKey: 250, - expectedLen: 3, + expectedLen: 3, // All series returned in full scan }, { name: "filtered by key range", @@ -181,13 +177,12 @@ func TestPartIterVerification(t *testing.T) { }, }, }, - querySids: []common.SeriesID{1}, minKey: 75, maxKey: 150, expectedLen: 1, // Block contains all elements [50-200], overlaps query range [75-150] }, { - name: "filtered by series ID", + name: "all series returned in range", elements: []testElement{ { seriesID: 1, @@ -226,10 +221,9 @@ func TestPartIterVerification(t *testing.T) { }, }, }, - querySids: []common.SeriesID{2}, minKey: 50, maxKey: 150, - expectedLen: 1, // Only series 2 should match + expectedLen: 3, // All series in full scan mode }, } @@ -237,7 +231,6 @@ func TestPartIterVerification(t *testing.T) { runTestCase := func(t *testing.T, tt struct { name string elements []testElement - querySids []common.SeriesID minKey int64 maxKey int64 expectedLen int @@ -251,7 +244,7 @@ func TestPartIterVerification(t *testing.T) { // Initialize partIter with clean blockMetadataArray bma.reset() // Keep blockMetadataArray clean before passing to init - pi.init(bma, part, tt.querySids, tt.minKey, tt.maxKey, nil) + pi.init(bma, part, tt.minKey, tt.maxKey) // Iterate through blocks and collect results var foundElements []testElement @@ -268,7 +261,6 @@ func TestPartIterVerification(t *testing.T) { overlaps := curBlock.maxKey >= tt.minKey && curBlock.minKey <= tt.maxKey assert.True(t, overlaps, "block should overlap with query range [%d, %d], but got block range [%d, %d]", tt.minKey, tt.maxKey, curBlock.minKey, curBlock.maxKey) - assert.Contains(t, tt.querySids, curBlock.seriesID, "block seriesID should be in query sids") // For verification, create a test element representing this block // Note: In a real scenario, you'd read the actual block data @@ -286,11 +278,6 @@ func TestPartIterVerification(t *testing.T) { // Verify results assert.Equal(t, tt.expectedLen, len(foundElements), "should find expected number of elements") - // Additional verification: ensure all found elements match expected series - for _, elem := range foundElements { - assert.Contains(t, tt.querySids, elem.seriesID, "found element should have expected seriesID") - } - t.Logf("Test %s completed: found %d blocks, expected %d", tt.name, blockCount, tt.expectedLen) } @@ -351,7 +338,7 @@ func TestPartIterEdgeCases(t *testing.T) { testFS := fs.NewLocalFileSystem() tempDir := t.TempDir() - t.Run("empty series list", func(t *testing.T) { + t.Run("full scan with data", func(t *testing.T) { // Create a simple part with data elements := createTestElements([]testElement{ { @@ -373,23 +360,23 @@ func TestPartIterEdgeCases(t *testing.T) { defer ReleaseMemPart(mp) mp.mustInitFromElements(elements) - partDir := filepath.Join(tempDir, "empty_series_test") + partDir := filepath.Join(tempDir, "full_scan_test") mp.mustFlush(testFS, partDir) part := mustOpenPart(1, partDir, testFS) defer part.close() - // Test with empty series list + // Test full scan bma := &blockMetadataArray{} defer bma.reset() pi := &partIter{} bma.reset() - pi.init(bma, part, []common.SeriesID{}, 0, 1000, nil) + pi.init(bma, part, 0, 1000) - // Should not find any blocks with empty series list + // Should find the block in full scan mode foundAny := pi.nextBlock() - assert.False(t, foundAny, "should not find any blocks with empty series list") + assert.True(t, foundAny, "should find blocks in full scan mode") }) t.Run("no matching key range", func(t *testing.T) { @@ -426,15 +413,15 @@ func TestPartIterEdgeCases(t *testing.T) { pi := &partIter{} bma.reset() - pi.init(bma, part, []common.SeriesID{1}, 200, 300, nil) // No overlap with key 100 + pi.init(bma, part, 200, 300) // No overlap with key 100 // Should not find any blocks foundAny := pi.nextBlock() assert.False(t, foundAny, "should not find any blocks with non-overlapping key range") }) - t.Run("no matching series ID", func(t *testing.T) { - // Create a part with seriesID 1 + t.Run("all series returned in full scan", func(t *testing.T) { + // Create a part with multiple series elements := createTestElements([]testElement{ { seriesID: 1, @@ -443,53 +430,19 @@ func TestPartIterEdgeCases(t *testing.T) { tags: []tag{ { name: "service", - value: []byte("test-service"), + value: []byte("test-service-1"), valueType: pbv1.ValueTypeStr, }, }, }, - }) - defer releaseElements(elements) - - mp := GenerateMemPart() - defer ReleaseMemPart(mp) - mp.mustInitFromElements(elements) - - partDir := filepath.Join(tempDir, "no_match_series") - mp.mustFlush(testFS, partDir) - - part := mustOpenPart(1, partDir, testFS) - defer part.close() - - // Test with different series ID - bma := &blockMetadataArray{} - defer bma.reset() - pi := &partIter{} - - bma.reset() - pi.init(bma, part, []common.SeriesID{2}, 0, 200, nil) // Different series ID - - // Should not find any blocks - foundAny := pi.nextBlock() - assert.False(t, foundAny, "should not find any blocks with non-matching series ID") - }) -} - -func TestPartIterBlockFilter(t *testing.T) { - tempDir := t.TempDir() - testFS := fs.NewLocalFileSystem() - - t.Run("blockFilter nil should not filter blocks", func(t *testing.T) { - // Create test elements - elements := createTestElements([]testElement{ { - seriesID: 1, + seriesID: 2, userKey: 100, - data: []byte("data1"), + data: []byte("data2"), tags: []tag{ { name: "service", - value: []byte("test-service"), + value: []byte("test-service-2"), valueType: pbv1.ValueTypeStr, }, }, @@ -501,72 +454,34 @@ func TestPartIterBlockFilter(t *testing.T) { defer ReleaseMemPart(mp) mp.mustInitFromElements(elements) - partDir := filepath.Join(tempDir, "nil_filter") + partDir := filepath.Join(tempDir, "all_series") mp.mustFlush(testFS, partDir) part := mustOpenPart(1, partDir, testFS) defer part.close() - // Test with nil blockFilter + // Full scan returns all series bma := &blockMetadataArray{} defer bma.reset() pi := &partIter{} bma.reset() - pi.init(bma, part, []common.SeriesID{1}, 0, 200, nil) // nil blockFilter + pi.init(bma, part, 0, 200) - // Should find the block - foundAny := pi.nextBlock() - assert.True(t, foundAny, "should find blocks when blockFilter is nil") - assert.NoError(t, pi.error()) + // Should find blocks for all series + count := 0 + for pi.nextBlock() { + count++ + } + assert.Equal(t, 2, count, "should find blocks for all series in full scan mode") }) +} - t.Run("blockFilter with mock filter that allows all", func(t *testing.T) { - // Create test elements - elements := createTestElements([]testElement{ - { - seriesID: 1, - userKey: 100, - data: []byte("data1"), - tags: []tag{ - { - name: "service", - value: []byte("test-service"), - valueType: pbv1.ValueTypeStr, - }, - }, - }, - }) - defer releaseElements(elements) - - mp := GenerateMemPart() - defer ReleaseMemPart(mp) - mp.mustInitFromElements(elements) - - partDir := filepath.Join(tempDir, "allow_all_filter") - mp.mustFlush(testFS, partDir) - - part := mustOpenPart(1, partDir, testFS) - defer part.close() - - // Create a mock filter that allows all blocks - mockFilter := &mockBlockFilter{shouldSkip: false} - - // Test with blockFilter that allows all - bma := &blockMetadataArray{} - defer bma.reset() - pi := &partIter{} - - bma.reset() - pi.init(bma, part, []common.SeriesID{1}, 0, 200, mockFilter) - - // Should find the block - foundAny := pi.nextBlock() - assert.True(t, foundAny, "should find blocks when blockFilter allows all") - assert.NoError(t, pi.error()) - }) +func TestPartIterBlockFilter(t *testing.T) { + tempDir := t.TempDir() + testFS := fs.NewLocalFileSystem() - t.Run("blockFilter with mock filter that skips all", func(t *testing.T) { + t.Run("blockFilter nil should not filter blocks", func(t *testing.T) { // Create test elements elements := createTestElements([]testElement{ { @@ -588,152 +503,79 @@ func TestPartIterBlockFilter(t *testing.T) { defer ReleaseMemPart(mp) mp.mustInitFromElements(elements) - partDir := filepath.Join(tempDir, "skip_all_filter") + partDir := filepath.Join(tempDir, "nil_filter") mp.mustFlush(testFS, partDir) part := mustOpenPart(1, partDir, testFS) defer part.close() - // Create a mock filter that skips all blocks - mockFilter := &mockBlockFilter{shouldSkip: true} - - // Test with blockFilter that skips all + // Test with nil blockFilter (note: blockFilter removed from init, always nil now) bma := &blockMetadataArray{} defer bma.reset() pi := &partIter{} bma.reset() - pi.init(bma, part, []common.SeriesID{1}, 0, 200, mockFilter) + pi.init(bma, part, 0, 200) - // Should not find any blocks + // Should find the block foundAny := pi.nextBlock() - assert.False(t, foundAny, "should not find blocks when blockFilter skips all") + assert.True(t, foundAny, "should find blocks") assert.NoError(t, pi.error()) }) - t.Run("blockFilter with error should propagate error", func(t *testing.T) { - // Create test elements - elements := createTestElements([]testElement{ - { - seriesID: 1, - userKey: 100, - data: []byte("data1"), - tags: []tag{ - { - name: "service", - value: []byte("test-service"), - valueType: pbv1.ValueTypeStr, - }, - }, - }, - }) - defer releaseElements(elements) - - mp := GenerateMemPart() - defer ReleaseMemPart(mp) - mp.mustInitFromElements(elements) - - partDir := filepath.Join(tempDir, "error_filter") - mp.mustFlush(testFS, partDir) - - part := mustOpenPart(1, partDir, testFS) - defer part.close() - - // Create a mock filter that returns an error - expectedErr := fmt.Errorf("test filter error") - mockFilter := &mockBlockFilter{shouldSkip: false, err: expectedErr} - - // Test with blockFilter that returns an error - bma := &blockMetadataArray{} - defer bma.reset() - pi := &partIter{} - - bma.reset() - pi.init(bma, part, []common.SeriesID{1}, 0, 200, mockFilter) - - // Should not find any blocks and should have error - foundAny := pi.nextBlock() - assert.False(t, foundAny, "should not find blocks when blockFilter returns error") - assert.Error(t, pi.error()) - assert.Contains(t, pi.error().Error(), "test filter error") - }) -} - -// mockBlockFilter is a mock implementation of index.Filter for testing. -type mockBlockFilter struct { - err error - shouldSkip bool -} - -func (mbf *mockBlockFilter) ShouldSkip(_ index.FilterOp) (bool, error) { - if mbf.err != nil { - return false, mbf.err - } - return mbf.shouldSkip, nil -} - -// These methods are required to satisfy the index.Filter interface. -func (mbf *mockBlockFilter) String() string { - return "mockBlockFilter" -} - -func (mbf *mockBlockFilter) Execute(_ index.GetSearcher, _ common.SeriesID, _ *index.RangeOpts) (posting.List, posting.List, error) { - // Not used in our tests, return empty implementation - return nil, nil, nil + // Note: blockFilter tests removed as blockFilter is no longer supported in partIter.init() + // The blockFilter functionality is still available in partIter.findBlock() but is always nil + // when initialized through init(). For block filtering, use the ScanQuery API with TagFilter instead. } -func TestPartIterShouldSkip(t *testing.T) { +func TestPartIterFullScan(t *testing.T) { tempDir := t.TempDir() testFS := fs.NewLocalFileSystem() - // Strategy: Create many elements to exceed maxElementsPerBlock (8192) and force multiple blocks. - // We'll create elements in two batches with different tag values but same seriesID. - - const elementsPerBatch = 8200 // Exceed maxElementsPerBlock to force multiple blocks + // Test full-scan behavior: Create elements for multiple series var allElements []testElement - // First batch: seriesID=1, status="pending", keys 0-8199 - for i := 0; i < elementsPerBatch; i++ { + // First series: seriesID=1 + for i := 0; i < 100; i++ { allElements = append(allElements, testElement{ seriesID: 1, userKey: int64(i), - data: []byte(fmt.Sprintf("pending_data_%d", i)), + data: []byte(fmt.Sprintf("series1_data_%d", i)), tags: []tag{ { - name: "status", - value: []byte("pending"), + name: "service", + value: []byte("service-1"), valueType: pbv1.ValueTypeStr, }, }, }) } - // Second batch: seriesID=1, status="success", keys 20000-28199 - // Use a large gap in keys to ensure they're in different blocks - for i := 0; i < elementsPerBatch; i++ { + // Second series: seriesID=2 + for i := 0; i < 100; i++ { allElements = append(allElements, testElement{ - seriesID: 1, - userKey: int64(20000 + i), - data: []byte(fmt.Sprintf("success_data_%d", i)), + seriesID: 2, + userKey: int64(i), + data: []byte(fmt.Sprintf("series2_data_%d", i)), tags: []tag{ { - name: "status", - value: []byte("success"), + name: "service", + value: []byte("service-2"), valueType: pbv1.ValueTypeStr, }, }, }) } - // Add a third series to verify we don't incorrectly skip to this + // Third series: seriesID=3 allElements = append(allElements, testElement{ - seriesID: 2, + seriesID: 3, userKey: 5000, - data: []byte("series2_data"), + data: []byte("series3_data"), tags: []tag{ { - name: "status", - value: []byte("other"), + name: "service", + value: []byte("service-3"), valueType: pbv1.ValueTypeStr, }, }, @@ -759,21 +601,13 @@ func TestPartIterShouldSkip(t *testing.T) { t.Logf(" Primary block %d: seriesID=%d, keys [%d-%d]", i, pbm.seriesID, pbm.minKey, pbm.maxKey) } - // Create a selective mock filter that skips blocks with status="pending" - // but allows blocks with status="success" - selectiveFilter := &selectiveMockBlockFilter{ - tagName: "status", - skipValue: "pending", - skipCallCount: 0, - } - - // Test with the selective filter, querying only seriesID=1 + // Test full scan - should return all series bma := &blockMetadataArray{} defer bma.reset() pi := &partIter{} bma.reset() - pi.init(bma, part, []common.SeriesID{1, 2}, 0, 30000, selectiveFilter) + pi.init(bma, part, 0, 10000) // Iterate through blocks and collect results var foundBlocks []struct { @@ -798,25 +632,44 @@ func TestPartIterShouldSkip(t *testing.T) { require.NoError(t, pi.error()) - // Verify the filter was called at least once - assert.Greater(t, selectiveFilter.skipCallCount, 0, "filter should have been called") - - foundSeries1Success := false + // Verify all series are found in full scan mode + foundSeries := make(map[common.SeriesID]bool) for _, block := range foundBlocks { - if block.seriesID == 1 && block.minKey >= 20000 { - foundSeries1Success = true - t.Logf("✓ Found the expected seriesID=1 success block: keys [%d-%d]", block.minKey, block.maxKey) - } + foundSeries[block.seriesID] = true + } + + assert.True(t, foundSeries[1], "should find series 1 in full scan") + assert.True(t, foundSeries[2], "should find series 2 in full scan") + assert.True(t, foundSeries[3], "should find series 3 in full scan") + assert.Equal(t, 3, len(foundSeries), "should find all 3 series in full scan mode") +} + +// mockBlockFilter is a mock implementation of index.Filter for testing. +// NOTE: This is kept for other test files that still reference it. +type mockBlockFilter struct { + err error + shouldSkip bool +} + +func (mbf *mockBlockFilter) ShouldSkip(_ index.FilterOp) (bool, error) { + if mbf.err != nil { + return false, mbf.err } + return mbf.shouldSkip, nil +} - // This assertion will FAIL with the bug, demonstrating the issue - assert.True(t, foundSeries1Success, - "BUG: Should find seriesID=1 block with status='success' (minKey >= 20000), "+ - "but the iterator incorrectly skipped to next series after first block with status='pending' failed the filter. "+ - "Found %d total blocks", len(foundBlocks)) +// These methods are required to satisfy the index.Filter interface. +func (mbf *mockBlockFilter) String() string { + return "mockBlockFilter" +} + +func (mbf *mockBlockFilter) Execute(_ index.GetSearcher, _ common.SeriesID, _ *index.RangeOpts) (posting.List, posting.List, error) { + // Not used in our tests, return empty implementation + return nil, nil, nil } // selectiveMockBlockFilter is a mock filter that selectively skips blocks based on tag values. +// NOTE: This is kept for other test files that still reference it. type selectiveMockBlockFilter struct { tagName string skipValue string @@ -837,3 +690,4 @@ func (smf *selectiveMockBlockFilter) String() string { func (smf *selectiveMockBlockFilter) Execute(_ index.GetSearcher, _ common.SeriesID, _ *index.RangeOpts) (posting.List, posting.List, error) { return nil, nil, nil } + diff --git a/banyand/internal/sidx/scan_query.go b/banyand/internal/sidx/scan_query.go new file mode 100644 index 00000000..26bd17f2 --- /dev/null +++ b/banyand/internal/sidx/scan_query.go @@ -0,0 +1,205 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sidx + +import ( + "context" + "math" + + "github.com/apache/skywalking-banyandb/api/common" +) + +// ScanQuery executes a synchronous full-scan query. +func (s *sidx) ScanQuery(ctx context.Context, req ScanQueryRequest) ([]*QueryResponse, error) { + if err := req.Validate(); err != nil { + return nil, err + } + + snap := s.currentSnapshot() + if snap == nil { + return nil, nil + } + defer snap.decRef() + + // Set default batch size + maxBatchSize := req.MaxBatchSize + if maxBatchSize <= 0 { + maxBatchSize = 1000 + } + + // Set key range + minKey := int64(math.MinInt64) + maxKey := int64(math.MaxInt64) + if req.MinKey != nil { + minKey = *req.MinKey + } + if req.MaxKey != nil { + maxKey = *req.MaxKey + } + + var results []*QueryResponse + + // Prepare Tags map if projection is specified + var tagsMap map[string][]string + if len(req.TagProjection) > 0 { + tagsMap = make(map[string][]string) + for _, proj := range req.TagProjection { + for _, tagName := range proj.Names { + tagsMap[tagName] = make([]string, 0, maxBatchSize) + } + } + } + + currentBatch := &QueryResponse{ + Keys: make([]int64, 0, maxBatchSize), + Data: make([][]byte, 0, maxBatchSize), + SIDs: make([]common.SeriesID, 0, maxBatchSize), + PartIDs: make([]uint64, 0, maxBatchSize), + Tags: tagsMap, + } + + // Scan all parts + totalParts := len(snap.parts) + for partIdx, pw := range snap.parts { + rowsBefore := 0 + for _, res := range results { + rowsBefore += res.Len() + } + rowsBefore += currentBatch.Len() + + if err := s.scanPart(ctx, pw, req, minKey, maxKey, &results, ¤tBatch, maxBatchSize); err != nil { + return nil, err + } + + // Count total rows found so far + totalRowsFound := 0 + for _, res := range results { + totalRowsFound += res.Len() + } + totalRowsFound += currentBatch.Len() + + // Report progress if callback is provided + if req.OnProgress != nil { + req.OnProgress(partIdx+1, totalParts, totalRowsFound) + } + } + + // Add remaining batch if not empty + if currentBatch.Len() > 0 { + results = append(results, currentBatch) + } + + return results, nil +} + +func (s *sidx) scanPart(ctx context.Context, pw *partWrapper, req ScanQueryRequest, + minKey, maxKey int64, results *[]*QueryResponse, currentBatch **QueryResponse, + maxBatchSize int) error { + + p := pw.p + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) + + // Create partIter on stack + pi := &partIter{} + + // Initialize iterator for full scan + pi.init(bma, p, minKey, maxKey) + + // Iterate through all blocks + for pi.nextBlock() { + if err := ctx.Err(); err != nil { + return err + } + + // Create a temporary QueryRequest to reuse existing blockCursor infrastructure + tmpReq := QueryRequest{ + TagFilter: req.TagFilter, + MinKey: &minKey, + MaxKey: &maxKey, + TagProjection: req.TagProjection, + MaxBatchSize: maxBatchSize, + } + + bc := generateBlockCursor() + bc.init(p, pi.curBlock, tmpReq) + + // Load block data + tmpBlock := generateBlock() + + // When we have a TagFilter, we need to load ALL tags so the filter can check them. + // Pass nil to loadBlockCursor to load all available tags. + var tagsToLoad map[string]struct{} + if req.TagFilter != nil { + // Load all tags when filtering (nil/empty map triggers loading all tags) + tagsToLoad = nil + } else if len(req.TagProjection) > 0 { + // Optimize: only load projected tags when no filter + tagsToLoad = make(map[string]struct{}) + for _, proj := range req.TagProjection { + for _, tagName := range proj.Names { + tagsToLoad[tagName] = struct{}{} + } + } + } + + if s.loadBlockCursor(bc, tmpBlock, blockScanResult{ + p: p, + bm: *pi.curBlock, + }, tagsToLoad, tmpReq, s.pm, nil) { + // Copy all rows from this block cursor to current batch + for idx := 0; idx < len(bc.userKeys); idx++ { + bc.idx = idx + + // Check if batch is full before adding + if (*currentBatch).Len() >= maxBatchSize { + *results = append(*results, *currentBatch) + + // Prepare Tags map for new batch if projection is specified + var newTagsMap map[string][]string + if len(req.TagProjection) > 0 { + newTagsMap = make(map[string][]string) + for _, proj := range req.TagProjection { + for _, tagName := range proj.Names { + newTagsMap[tagName] = make([]string, 0, maxBatchSize) + } + } + } + + *currentBatch = &QueryResponse{ + Keys: make([]int64, 0, maxBatchSize), + Data: make([][]byte, 0, maxBatchSize), + SIDs: make([]common.SeriesID, 0, maxBatchSize), + PartIDs: make([]uint64, 0, maxBatchSize), + Tags: newTagsMap, + } + } + + // Add to current batch + if bc.copyTo(*currentBatch) { + // copyTo already appends to the batch + } + } + } + + releaseBlock(tmpBlock) + releaseBlockCursor(bc) + } + + return pi.error() +} diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go index 7844087b..d9979108 100644 --- a/banyand/internal/sidx/sidx.go +++ b/banyand/internal/sidx/sidx.go @@ -23,6 +23,7 @@ import ( "context" "os" "path/filepath" + "strings" "sync" "sync/atomic" @@ -496,9 +497,37 @@ func (bc *blockCursor) copyTo(result *QueryResponse) bool { result.SIDs = append(result.SIDs, bc.seriesID) result.PartIDs = append(result.PartIDs, bc.p.ID()) + // Copy projected tags if specified + if len(bc.request.TagProjection) > 0 && len(result.Tags) > 0 { + for _, proj := range bc.request.TagProjection { + for _, tagName := range proj.Names { + if tagData, exists := bc.tags[tagName]; exists && bc.idx < len(tagData) { + tagValue := formatTagValue(tagData[bc.idx]) + result.Tags[tagName] = append(result.Tags[tagName], tagValue) + } else { + // Tag not present for this row + result.Tags[tagName] = append(result.Tags[tagName], "") + } + } + } + } + return true } +// formatTagValue converts a Tag to a string representation +func formatTagValue(tag Tag) string { + if len(tag.ValueArr) > 0 { + // Array of values + values := make([]string, len(tag.ValueArr)) + for i, v := range tag.ValueArr { + values[i] = string(v) + } + return "[" + strings.Join(values, ",") + "]" + } + return string(tag.Value) +} + // blockCursorHeap implements heap.Interface for sorting block cursors. type blockCursorHeap struct { bcc []*blockCursor diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go index 8de4f4d3..a342860a 100644 --- a/banyand/internal/sidx/tag.go +++ b/banyand/internal/sidx/tag.go @@ -60,12 +60,7 @@ type tagRow struct { func (tr *tagRow) reset() { tr.value = nil - if tr.valueArr != nil { - for i := range tr.valueArr { - tr.valueArr[i] = nil - } - } - tr.valueArr = tr.valueArr[:0] + tr.valueArr = nil } var ( @@ -202,9 +197,6 @@ func marshalTagRow(dst []byte, tr *tagRow, valueType pbv1.ValueType) []byte { dst = append(dst, tr.valueArr[i]...) continue } - if len(tr.valueArr[i]) == 0 { - continue - } dst = internalencoding.MarshalVarArray(dst, tr.valueArr[i]) } return dst diff --git a/banyand/trace/streaming_pipeline_test.go b/banyand/trace/streaming_pipeline_test.go index a36ae9e5..ff74b09a 100644 --- a/banyand/trace/streaming_pipeline_test.go +++ b/banyand/trace/streaming_pipeline_test.go @@ -81,6 +81,9 @@ func (f *fakeSIDX) StreamingParts(map[uint64]struct{}, string, uint32, string) ( } func (f *fakeSIDX) PartPaths(map[uint64]struct{}) map[uint64]string { return map[uint64]string{} } func (f *fakeSIDX) IntroduceSynced(map[uint64]struct{}) func() { return func() {} } +func (f *fakeSIDX) ScanQuery(context.Context, sidx.ScanQueryRequest) ([]*sidx.QueryResponse, error) { + return nil, nil +} type fakeSIDXWithErr struct { *fakeSIDX @@ -565,6 +568,10 @@ type fakeSIDXInfinite struct { keyStart int64 } +func (f *fakeSIDXInfinite) ScanQuery(context.Context, sidx.ScanQueryRequest) ([]*sidx.QueryResponse, error) { + return nil, nil +} + func (f *fakeSIDXInfinite) StreamingQuery(ctx context.Context, _ sidx.QueryRequest) (<-chan *sidx.QueryResponse, <-chan error) { results := make(chan *sidx.QueryResponse) errCh := make(chan error, 1)
