This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug/empty-trace in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 0c891e37432ff351bb84999337d64491a598f04d Author: Gao Hongtao <[email protected]> AuthorDate: Thu Nov 20 05:25:38 2025 +0000 Refactor sidx dump functionality and improve trace handling - Simplified the `filterPartsByTimeRange` function by removing error handling, returning only filtered part IDs. - Updated `dumpSidxFullScan` to use the modified `filterPartsByTimeRange` and adjusted parameter order for clarity. - Enhanced comments for better understanding of the code structure and functionality. - Removed unused `dumpTracePart` function and related CSV output logic to streamline trace processing. - Improved `ScanQueryRequest` struct layout for readability and added comments for clarity. --- banyand/cmd/dump/sidx.go | 75 +++--- banyand/cmd/dump/trace.go | 434 ++------------------------------ banyand/internal/sidx/interfaces.go | 14 +- banyand/internal/sidx/part_iter_test.go | 1 - banyand/internal/sidx/scan_query.go | 8 +- banyand/internal/sidx/sidx.go | 2 +- test/cases/trace/trace.go | 2 +- 7 files changed, 65 insertions(+), 471 deletions(-) diff --git a/banyand/cmd/dump/sidx.go b/banyand/cmd/dump/sidx.go index 433a6096..e4425a06 100644 --- a/banyand/cmd/dump/sidx.go +++ b/banyand/cmd/dump/sidx.go @@ -158,10 +158,7 @@ func dumpSidx(sidxPath, segmentPath, criteriaJSON string, csvOutput bool, timeBe // Filter parts by time range if specified var partIDs []uint64 if hasTimeRange { - partIDs, err = filterPartsByTimeRange(sidxPath, allPartIDs, minKeyNanos, maxKeyNanos) - if err != nil { - return fmt.Errorf("failed to filter parts by time range: %w", err) - } + partIDs = filterPartsByTimeRange(sidxPath, allPartIDs, minKeyNanos, maxKeyNanos) fmt.Fprintf(os.Stderr, "Filtered to %d parts (out of %d) based on time range\n", len(partIDs), len(allPartIDs)) } else { partIDs = allPartIDs @@ -183,7 +180,7 @@ func dumpSidx(sidxPath, segmentPath, criteriaJSON string, csvOutput bool, timeBe // Use full-scan mode if requested if fullScan { - return dumpSidxFullScan(sidxPath, segmentPath, criteria, csvOutput, hasTimeRange, minKeyNanos, maxKeyNanos, allPartIDs, partIDs, tagNames, dataFilter) + return dumpSidxFullScan(sidxPath, segmentPath, criteria, csvOutput, hasTimeRange, minKeyNanos, maxKeyNanos, partIDs, tagNames, dataFilter) } // Open the file system @@ -274,7 +271,7 @@ func parseTimeRange(timeBegin, timeEnd string) (int64, int64, error) { return minKeyNanos, maxKeyNanos, nil } -func filterPartsByTimeRange(sidxPath string, partIDs []uint64, minKey, maxKey int64) ([]uint64, error) { +func filterPartsByTimeRange(sidxPath string, partIDs []uint64, minKey, maxKey int64) []uint64 { fileSystem := fs.NewLocalFileSystem() var filteredParts []uint64 @@ -306,7 +303,7 @@ func filterPartsByTimeRange(sidxPath string, partIDs []uint64, minKey, maxKey in } } - return filteredParts, nil + return filteredParts } func parseCriteriaJSON(criteriaJSON string) (*modelv1.Criteria, error) { @@ -407,7 +404,9 @@ func discoverSeriesIDs(segmentPath string) ([]common.SeriesID, error) { return seriesIDs, nil } -func buildQueryRequest(criteria *modelv1.Criteria, seriesIDs []common.SeriesID, hasTimeRange bool, minKeyNanos, maxKeyNanos int64, tagRegistry *dynamicTagRegistry) (sidx.QueryRequest, error) { +func buildQueryRequest( + criteria *modelv1.Criteria, seriesIDs []common.SeriesID, hasTimeRange bool, minKeyNanos, maxKeyNanos int64, tagRegistry *dynamicTagRegistry, +) (sidx.QueryRequest, error) { // Use discovered series IDs if available, otherwise fall back to default range if len(seriesIDs) == 0 { // Use a range of series IDs to increase chance of finding data @@ -582,8 +581,8 @@ func outputSidxResultsAsCSV(resultsCh <-chan *sidx.QueryResponse, errCh <-chan e return nil } -// simpleTagValueDecoder is a simple decoder for tag values used in filtering -// It handles basic tag value types without needing full schema information +// simpleTagValueDecoder is a simple decoder for tag values used in filtering. +// It handles basic tag value types without needing full schema information. func simpleTagValueDecoder(valueType pbv1.ValueType, value []byte, valueArr [][]byte) *modelv1.TagValue { if value == nil && valueArr == nil { return pbv1.NullTagValue @@ -648,12 +647,12 @@ func simpleTagValueDecoder(valueType pbv1.ValueType, value []byte, valueArr [][] } } -// dynamicTagRegistry implements TagSpecRegistry by discovering tags from the sidx +// dynamicTagRegistry implements TagSpecRegistry by discovering tags from the sidx. type dynamicTagRegistry struct { tagSpecs map[string]*logical.TagSpec } -// newDynamicTagRegistry creates a registry by discovering tag names from the sidx directory +// newDynamicTagRegistry creates a registry by discovering tag names from the sidx directory. func newDynamicTagRegistry(sidxPath string) (*dynamicTagRegistry, error) { registry := &dynamicTagRegistry{ tagSpecs: make(map[string]*logical.TagSpec), @@ -732,49 +731,49 @@ func (d *dynamicTagRegistry) FindTagSpecByName(name string) *logical.TagSpec { } } -// IndexDefined implements IndexChecker interface (stub implementation) +// IndexDefined implements IndexChecker interface (stub implementation). func (d *dynamicTagRegistry) IndexDefined(_ string) (bool, *databasev1.IndexRule) { return false, nil } -// IndexRuleDefined implements IndexChecker interface (stub implementation) +// IndexRuleDefined implements IndexChecker interface (stub implementation). func (d *dynamicTagRegistry) IndexRuleDefined(_ string) (bool, *databasev1.IndexRule) { return false, nil } -// EntityList implements Schema interface (stub implementation) +// EntityList implements Schema interface (stub implementation). func (d *dynamicTagRegistry) EntityList() []string { return nil } -// CreateTagRef implements Schema interface (stub implementation) -func (d *dynamicTagRegistry) CreateTagRef(tags ...[]*logical.Tag) ([][]*logical.TagRef, error) { +// CreateTagRef implements Schema interface (stub implementation). +func (d *dynamicTagRegistry) CreateTagRef(_ ...[]*logical.Tag) ([][]*logical.TagRef, error) { return nil, fmt.Errorf("CreateTagRef not supported in dump tool") } -// CreateFieldRef implements Schema interface (stub implementation) -func (d *dynamicTagRegistry) CreateFieldRef(fields ...*logical.Field) ([]*logical.FieldRef, error) { +// CreateFieldRef implements Schema interface (stub implementation). +func (d *dynamicTagRegistry) CreateFieldRef(_ ...*logical.Field) ([]*logical.FieldRef, error) { return nil, fmt.Errorf("CreateFieldRef not supported in dump tool") } -// ProjTags implements Schema interface (stub implementation) -func (d *dynamicTagRegistry) ProjTags(refs ...[]*logical.TagRef) logical.Schema { +// ProjTags implements Schema interface (stub implementation). +func (d *dynamicTagRegistry) ProjTags(_ ...[]*logical.TagRef) logical.Schema { return d } -// ProjFields implements Schema interface (stub implementation) -func (d *dynamicTagRegistry) ProjFields(refs ...*logical.FieldRef) logical.Schema { +// ProjFields implements Schema interface (stub implementation). +func (d *dynamicTagRegistry) ProjFields(_ ...*logical.FieldRef) logical.Schema { return d } -// Children implements Schema interface (stub implementation) +// Children implements Schema interface (stub implementation). func (d *dynamicTagRegistry) Children() []logical.Schema { return nil } func dumpSidxFullScan(sidxPath, segmentPath string, criteria *modelv1.Criteria, csvOutput bool, - hasTimeRange bool, minKeyNanos, maxKeyNanos int64, allPartIDs, partIDs []uint64, projectionTagNames []string, dataFilter string) error { - + hasTimeRange bool, minKeyNanos, maxKeyNanos int64, partIDs []uint64, projectionTagNames []string, dataFilter string, +) error { fmt.Fprintf(os.Stderr, "Using full-scan mode (no series ID filtering)\n") // Load series information for human-readable output @@ -801,10 +800,10 @@ func dumpSidxFullScan(sidxPath, segmentPath string, criteria *modelv1.Criteria, // Create dynamic tag registry if criteria is provided var tagRegistry *dynamicTagRegistry if criteria != nil { - var err error - tagRegistry, err = newDynamicTagRegistry(sidxPath) - if err != nil { - fmt.Fprintf(os.Stderr, "Warning: Failed to create tag registry: %v\n", err) + var regErr error + tagRegistry, regErr = newDynamicTagRegistry(sidxPath) + if regErr != nil { + fmt.Fprintf(os.Stderr, "Warning: Failed to create tag registry: %v\n", regErr) } } @@ -838,9 +837,9 @@ func dumpSidxFullScan(sidxPath, segmentPath string, criteria *modelv1.Criteria, if criteria != nil && tagRegistry != nil { fmt.Fprintf(os.Stderr, "Discovered %d tags from sidx\n", len(tagRegistry.tagSpecs)) - tagFilter, err := logical.BuildSimpleTagFilter(criteria) - if err != nil { - return fmt.Errorf("failed to build tag filter: %w", err) + tagFilter, filterErr := logical.BuildSimpleTagFilter(criteria) + if filterErr != nil { + return fmt.Errorf("failed to build tag filter: %w", filterErr) } if tagFilter != nil && tagFilter != logical.DummyFilter { scanReq.TagFilter = logical.NewTagFilterMatcher(tagFilter, tagRegistry, simpleTagValueDecoder) @@ -884,7 +883,7 @@ func dumpSidxFullScan(sidxPath, segmentPath string, criteria *modelv1.Criteria, return outputScanResultsAsText(results, sidxPath, seriesMap, projectionTagNames, dataFilter) } -// parseProjectionTags parses a comma-separated list of tag names +// parseProjectionTags parses a comma-separated list of tag names. func parseProjectionTags(projectionStr string) []string { if projectionStr == "" { return nil @@ -901,7 +900,7 @@ func parseProjectionTags(projectionStr string) []string { return result } -// loadSeriesMap loads all series from the segment's series index and creates a map from SeriesID to text representation +// loadSeriesMap loads all series from the segment's series index and creates a map from SeriesID to text representation. func loadSeriesMap(segmentPath string) (map[common.SeriesID]string, error) { seriesIndexPath := filepath.Join(segmentPath, "sidx") @@ -952,7 +951,6 @@ func outputScanResultsAsText(results []*sidx.QueryResponse, sidxPath string, ser } batchPrinted := false - batchStartRow := totalRows for i := 0; i < resp.Len(); i++ { dataStr := string(resp.Data[i]) @@ -999,10 +997,7 @@ func outputScanResultsAsText(results []*sidx.QueryResponse, sidxPath string, ser totalRows++ } - // Add newline after batch if we printed anything - if batchPrinted && totalRows > batchStartRow { - // Already added newline after each row - } + // (Already added newline after each row) } if dataFilter != "" { diff --git a/banyand/cmd/dump/trace.go b/banyand/cmd/dump/trace.go index 02422e1a..267bf1ee 100644 --- a/banyand/cmd/dump/trace.go +++ b/banyand/cmd/dump/trace.go @@ -105,176 +105,7 @@ Supports filtering by criteria and projecting specific tags.`, return cmd } -func dumpTracePart(partPath string, verbose bool, csvOutput bool) error { - // Get the part ID from the directory name - partName := filepath.Base(partPath) - partID, err := strconv.ParseUint(partName, 16, 64) - if err != nil { - return fmt.Errorf("invalid part name %q: %w", partName, err) - } - - // Get the root path (parent directory) - rootPath := filepath.Dir(partPath) - - // Open the file system - fileSystem := fs.NewLocalFileSystem() - - // Open the part - p, err := openFilePart(partID, rootPath, fileSystem) - if err != nil { - return fmt.Errorf("failed to open part: %w", err) - } - defer closePart(p) - - if csvOutput { - return dumpPartAsCSV(p) - } - - // Original text output - fmt.Printf("Opening trace part: %s (ID: %d)\n", partPath, partID) - fmt.Printf("================================================================================\n\n") - - // Print part metadata - fmt.Printf("Part Metadata:\n") - fmt.Printf(" ID: %d (0x%016x)\n", p.partMetadata.ID, p.partMetadata.ID) - fmt.Printf(" Total Count: %d\n", p.partMetadata.TotalCount) - fmt.Printf(" Blocks Count: %d\n", p.partMetadata.BlocksCount) - fmt.Printf(" Min Timestamp: %s (%d)\n", formatTimestamp(p.partMetadata.MinTimestamp), p.partMetadata.MinTimestamp) - fmt.Printf(" Max Timestamp: %s (%d)\n", formatTimestamp(p.partMetadata.MaxTimestamp), p.partMetadata.MaxTimestamp) - fmt.Printf(" Compressed Size: %d bytes\n", p.partMetadata.CompressedSizeBytes) - fmt.Printf(" Uncompressed Span Size: %d bytes\n", p.partMetadata.UncompressedSpanSizeBytes) - fmt.Printf("\n") - - // Print tag types - if len(p.tagType) > 0 { - fmt.Printf("Tag Types:\n") - tagNames := make([]string, 0, len(p.tagType)) - for name := range p.tagType { - tagNames = append(tagNames, name) - } - sort.Strings(tagNames) - for _, name := range tagNames { - fmt.Printf(" %s: %s\n", name, valueTypeName(p.tagType[name])) - } - fmt.Printf("\n") - } - - // Print primary block metadata - fmt.Printf("Primary Block Metadata (Total: %d blocks):\n", len(p.primaryBlockMetadata)) - fmt.Printf("--------------------------------------------------------------------------------\n") - for i, pbm := range p.primaryBlockMetadata { - fmt.Printf("Block %d:\n", i) - fmt.Printf(" TraceID: %s\n", pbm.traceID) - fmt.Printf(" Offset: %d\n", pbm.offset) - fmt.Printf(" Size: %d bytes\n", pbm.size) - - // Read and decompress the primary data for this block - if verbose { - primaryData := make([]byte, pbm.size) - fs.MustReadData(p.primary, int64(pbm.offset), primaryData) - decompressed, err := zstd.Decompress(nil, primaryData) - if err == nil { - fmt.Printf(" Primary Data (decompressed %d bytes):\n", len(decompressed)) - printHexDump(decompressed, 4) - } - } - } - fmt.Printf("\n") - - // Read and display trace data - fmt.Printf("Trace Data:\n") - fmt.Printf("================================================================================\n\n") - - decoder := &encoding.BytesBlockDecoder{} - - rowNum := 0 - for _, pbm := range p.primaryBlockMetadata { - // Read primary data block - primaryData := make([]byte, pbm.size) - fs.MustReadData(p.primary, int64(pbm.offset), primaryData) - - // Decompress - decompressed, err := zstd.Decompress(nil, primaryData) - if err != nil { - fmt.Printf("Error decompressing primary data: %v\n", err) - continue - } - - // Parse ALL block metadata entries from this primary block - blockMetadatas, err := parseAllBlockMetadata(decompressed, p.tagType) - if err != nil { - fmt.Printf("Error parsing block metadata: %v\n", err) - continue - } - - // Process each trace block within this primary block - for _, bm := range blockMetadatas { - // Read spans - spans, spanIDs, err := readSpans(decoder, bm.spans, int(bm.count), p.spans) - if err != nil { - fmt.Printf("Error reading spans for trace %s: %v\n", bm.traceID, err) - continue - } - - // Read tags - tags := make(map[string][][]byte) - for tagName, tagBlock := range bm.tags { - tagValues, err := readTagValues(decoder, tagBlock, tagName, int(bm.count), p.tagMetadata[tagName], p.tags[tagName], p.tagType[tagName]) - if err != nil { - fmt.Printf("Error reading tag %s for trace %s: %v\n", tagName, bm.traceID, err) - continue - } - tags[tagName] = tagValues - } - - // Display each span as a row - for i := 0; i < len(spans); i++ { - rowNum++ - fmt.Printf("Row %d:\n", rowNum) - fmt.Printf(" TraceID: %s\n", bm.traceID) - fmt.Printf(" SpanID: %s\n", spanIDs[i]) - fmt.Printf(" Span Data: %d bytes\n", len(spans[i])) - if verbose { - fmt.Printf(" Span Content:\n") - printHexDump(spans[i], 4) - } else { - // Try to print as string if it's printable - if isPrintable(spans[i]) { - fmt.Printf(" Span: %s\n", string(spans[i])) - } else { - fmt.Printf(" Span: (binary data, %d bytes)\n", len(spans[i])) - } - } - - // Print tags for this span - if len(tags) > 0 { - fmt.Printf(" Tags:\n") - tagNames := make([]string, 0, len(tags)) - for name := range tags { - tagNames = append(tagNames, name) - } - sort.Strings(tagNames) - for _, name := range tagNames { - if i < len(tags[name]) { - tagValue := tags[name][i] - if tagValue == nil { - fmt.Printf(" %s: <nil>\n", name) - } else { - valueType := p.tagType[name] - fmt.Printf(" %s (%s): %s\n", name, valueTypeName(valueType), formatTagValueForDisplay(tagValue, valueType)) - } - } - } - } - fmt.Printf("\n") - } - } - } - - fmt.Printf("Total rows: %d\n", rowNum) - return nil -} - +//nolint:gocyclo // dumpTraceShard has high complexity due to multiple output formats and filtering options func dumpTraceShard(shardPath, segmentPath string, verbose bool, csvOutput bool, criteriaJSON, projectionTagsStr string) error { // Discover all part directories in the shard partIDs, err := discoverTracePartIDs(shardPath) @@ -465,114 +296,6 @@ func dumpTraceShard(shardPath, segmentPath string, verbose bool, csvOutput bool, return nil } -func dumpPartAsCSV(p *part) error { - decoder := &encoding.BytesBlockDecoder{} - - // Collect all tag names in sorted order - allTagNames := make([]string, 0, len(p.tagType)) - for name := range p.tagType { - allTagNames = append(allTagNames, name) - } - sort.Strings(allTagNames) - - // Create CSV writer - writer := csv.NewWriter(os.Stdout) - defer writer.Flush() - - // Write header - header := []string{"TraceID", "SpanID", "SpanDataSize"} - header = append(header, allTagNames...) - if err := writer.Write(header); err != nil { - return fmt.Errorf("failed to write CSV header: %w", err) - } - - // Process all blocks and write rows - for _, pbm := range p.primaryBlockMetadata { - // Read primary data block - primaryData := make([]byte, pbm.size) - fs.MustReadData(p.primary, int64(pbm.offset), primaryData) - - // Decompress - decompressed, err := zstd.Decompress(nil, primaryData) - if err != nil { - fmt.Fprintf(os.Stderr, "Error decompressing primary data: %v\n", err) - continue - } - - // Parse ALL block metadata entries from this primary block - blockMetadatas, err := parseAllBlockMetadata(decompressed, p.tagType) - if err != nil { - fmt.Fprintf(os.Stderr, "Error parsing block metadata: %v\n", err) - continue - } - - // Process each block - for _, bm := range blockMetadatas { - // Read spans - spans, spanIDs, err := readSpans(decoder, bm.spans, int(bm.count), p.spans) - if err != nil { - fmt.Fprintf(os.Stderr, "Error reading spans for trace %s: %v\n", bm.traceID, err) - continue - } - - // Read tags - tags := make(map[string][][]byte) - for tagName, tagBlock := range bm.tags { - tagValues, err := readTagValues(decoder, tagBlock, tagName, int(bm.count), p.tagMetadata[tagName], p.tags[tagName], p.tagType[tagName]) - if err != nil { - fmt.Fprintf(os.Stderr, "Error reading tag %s for trace %s: %v\n", tagName, bm.traceID, err) - continue - } - tags[tagName] = tagValues - } - - // Write each span as a CSV row - for i := 0; i < len(spans); i++ { - row := append(make([]string, 0, len(header)), bm.traceID, spanIDs[i], strconv.Itoa(len(spans[i]))) - - // Add tag values in the same order as header - for _, tagName := range allTagNames { - var value string - if i < len(tags[tagName]) && tags[tagName][i] != nil { - valueType := p.tagType[tagName] - value = formatTagValueForCSV(tags[tagName][i], valueType) - } - row = append(row, value) - } - - if err := writer.Write(row); err != nil { - return fmt.Errorf("failed to write CSV row: %w", err) - } - } - } - } - - return nil -} - -func valueTypeName(vt pbv1.ValueType) string { - switch vt { - case pbv1.ValueTypeStr: - return "STRING" - case pbv1.ValueTypeInt64: - return "INT64" - case pbv1.ValueTypeFloat64: - return "FLOAT64" - case pbv1.ValueTypeStrArr: - return "STRING_ARRAY" - case pbv1.ValueTypeInt64Arr: - return "INT64_ARRAY" - case pbv1.ValueTypeBinaryData: - return "BINARY_DATA" - case pbv1.ValueTypeTimestamp: - return "TIMESTAMP" - case pbv1.ValueTypeUnknown: - return "UNKNOWN" - default: - return fmt.Sprintf("UNKNOWN(%d)", vt) - } -} - func formatTimestamp(nanos int64) string { if nanos == 0 { return "N/A" @@ -618,61 +341,6 @@ func formatTagValueForDisplay(data []byte, vt pbv1.ValueType) string { } } -func formatTagValueForCSV(data []byte, vt pbv1.ValueType) string { - if data == nil { - return "" - } - switch vt { - case pbv1.ValueTypeStr: - return string(data) - case pbv1.ValueTypeInt64: - if len(data) >= 8 { - return strconv.FormatInt(convert.BytesToInt64(data), 10) - } - return "" - case pbv1.ValueTypeFloat64: - if len(data) >= 8 { - return strconv.FormatFloat(convert.BytesToFloat64(data), 'f', -1, 64) - } - return "" - case pbv1.ValueTypeTimestamp: - if len(data) >= 8 { - nanos := convert.BytesToInt64(data) - return formatTimestamp(nanos) - } - return "" - case pbv1.ValueTypeStrArr: - // Decode string array - each element is separated by EntityDelimiter - var values []string - var err error - remaining := data - for len(remaining) > 0 { - var decoded []byte - decoded, remaining, err = unmarshalVarArray(nil, remaining) - if err != nil { - break - } - if len(decoded) > 0 { - values = append(values, string(decoded)) - } - } - if len(values) > 0 { - return strings.Join(values, ";") - } - return "" - case pbv1.ValueTypeBinaryData: - if isPrintable(data) { - return string(data) - } - return fmt.Sprintf("(binary: %d bytes)", len(data)) - default: - if isPrintable(data) { - return string(data) - } - return fmt.Sprintf("(binary: %d bytes)", len(data)) - } -} - func unmarshalVarArray(dest, src []byte) ([]byte, []byte, error) { if len(src) == 0 { return nil, nil, fmt.Errorf("empty entity value") @@ -780,11 +448,11 @@ type part struct { } type traceRowData struct { - partID uint64 + tags map[string][]byte traceID string spanID string spanData []byte - tags map[string][]byte + partID uint64 seriesID common.SeriesID } @@ -1105,7 +773,7 @@ func unmarshalTagMetadata(tm *tagMetadata, src []byte) error { return nil } -// Helper functions for new shard-level dump +// Helper functions for new shard-level dump. func discoverTracePartIDs(shardPath string) ([]uint64, error) { entries, err := os.ReadDir(shardPath) @@ -1313,6 +981,14 @@ func convertTagValue(value []byte, valueType pbv1.ValueType) *modelv1.TagValue { BinaryData: value, }, } + case pbv1.ValueTypeUnknown: + // Fall through to default + case pbv1.ValueTypeFloat64: + // Fall through to default + case pbv1.ValueTypeInt64Arr: + // Fall through to default + case pbv1.ValueTypeTimestamp: + // Fall through to default } // Default: try to return as string @@ -1325,7 +1001,7 @@ func convertTagValue(value []byte, valueType pbv1.ValueType) *modelv1.TagValue { } } -// traceTagRegistry implements logical.Schema for tag filtering +// traceTagRegistry implements logical.Schema for tag filtering. type traceTagRegistry struct { tagTypes map[string]pbv1.ValueType } @@ -1366,19 +1042,19 @@ func (r *traceTagRegistry) EntityList() []string { return nil } -func (r *traceTagRegistry) CreateTagRef(tags ...[]*logical.Tag) ([][]*logical.TagRef, error) { +func (r *traceTagRegistry) CreateTagRef(_ ...[]*logical.Tag) ([][]*logical.TagRef, error) { return nil, fmt.Errorf("CreateTagRef not supported in dump tool") } -func (r *traceTagRegistry) CreateFieldRef(fields ...*logical.Field) ([]*logical.FieldRef, error) { +func (r *traceTagRegistry) CreateFieldRef(_ ...*logical.Field) ([]*logical.FieldRef, error) { return nil, fmt.Errorf("CreateFieldRef not supported in dump tool") } -func (r *traceTagRegistry) ProjTags(refs ...[]*logical.TagRef) logical.Schema { +func (r *traceTagRegistry) ProjTags(_ ...[]*logical.TagRef) logical.Schema { return r } -func (r *traceTagRegistry) ProjFields(refs ...*logical.FieldRef) logical.Schema { +func (r *traceTagRegistry) ProjFields(_ ...*logical.FieldRef) logical.Schema { return r } @@ -1555,79 +1231,3 @@ func writeTraceRowAsCSV(writer *csv.Writer, row traceRowData, tagColumns []strin return writer.Write(csvRow) } - -func dumpTraceRowsAsText(rows []traceRowData, verbose bool, projectionTags []string, seriesMap map[common.SeriesID]string) error { - fmt.Printf("================================================================================\n") - fmt.Printf("Total rows: %d\n\n", len(rows)) - - for idx, row := range rows { - writeTraceRowAsText(row, idx+1, verbose, projectionTags, seriesMap) - } - - return nil -} - -func dumpTraceRowsAsCSV(rows []traceRowData, projectionTags []string, seriesMap map[common.SeriesID]string) error { - writer := csv.NewWriter(os.Stdout) - defer writer.Flush() - - // Determine which tags to include in CSV - var tagColumns []string - if len(projectionTags) > 0 { - tagColumns = projectionTags - } else { - // Collect all unique tag names - tagSet := make(map[string]bool) - for _, row := range rows { - for name := range row.tags { - tagSet[name] = true - } - } - for name := range tagSet { - tagColumns = append(tagColumns, name) - } - sort.Strings(tagColumns) - } - - // Write header - header := []string{"PartID", "TraceID", "SpanID", "SeriesID", "Series", "SpanDataSize"} - header = append(header, tagColumns...) - - if err := writer.Write(header); err != nil { - return fmt.Errorf("failed to write CSV header: %w", err) - } - - // Write rows - for _, row := range rows { - seriesText := "" - if seriesMap != nil { - if text, ok := seriesMap[row.seriesID]; ok { - seriesText = text - } - } - - csvRow := []string{ - fmt.Sprintf("%d", row.partID), - row.traceID, - row.spanID, - fmt.Sprintf("%d", row.seriesID), - seriesText, - strconv.Itoa(len(row.spanData)), - } - - // Add tag values - for _, tagName := range tagColumns { - value := "" - if tagValue, exists := row.tags[tagName]; exists && tagValue != nil { - value = string(tagValue) - } - csvRow = append(csvRow, value) - } - - if err := writer.Write(csvRow); err != nil { - return fmt.Errorf("failed to write CSV row: %w", err) - } - } - - return nil -} diff --git a/banyand/internal/sidx/interfaces.go b/banyand/internal/sidx/interfaces.go index 7dc54bef..cf7ac401 100644 --- a/banyand/internal/sidx/interfaces.go +++ b/banyand/internal/sidx/interfaces.go @@ -91,22 +91,24 @@ type QueryRequest struct { MaxBatchSize int } -// ScanQueryRequest specifies parameters for a full-scan query operation. -// Unlike QueryRequest, this does not require SeriesIDs and does not support Filter -// (all blocks are scanned, none are skipped). // ScanProgressFunc is a callback for reporting scan progress. // It receives the current part number (1-based), total parts, and rows found so far. type ScanProgressFunc func(currentPart, totalParts int, rowsFound int) +// ScanQueryRequest specifies parameters for a full-scan query operation. +// Unlike QueryRequest, this does not require SeriesIDs and does not support Filter +// (all blocks are scanned, none are skipped). +// +//nolint:govet // struct layout optimized for readability; 64 bytes is acceptable type ScanQueryRequest struct { TagFilter model.TagFilterMatcher + TagProjection []model.TagProjection + OnProgress ScanProgressFunc MinKey *int64 MaxKey *int64 - TagProjection []model.TagProjection - MaxBatchSize int // Max results per response batch + MaxBatchSize int // OnProgress is an optional callback for progress reporting during scan. // Called after processing each part with the current progress. - OnProgress ScanProgressFunc } // QueryResponse contains a batch of query results and execution metadata. diff --git a/banyand/internal/sidx/part_iter_test.go b/banyand/internal/sidx/part_iter_test.go index 1cb00f65..bba55979 100644 --- a/banyand/internal/sidx/part_iter_test.go +++ b/banyand/internal/sidx/part_iter_test.go @@ -690,4 +690,3 @@ func (smf *selectiveMockBlockFilter) String() string { func (smf *selectiveMockBlockFilter) Execute(_ index.GetSearcher, _ common.SeriesID, _ *index.RangeOpts) (posting.List, posting.List, error) { return nil, nil, nil } - diff --git a/banyand/internal/sidx/scan_query.go b/banyand/internal/sidx/scan_query.go index 26bd17f2..57ab75b4 100644 --- a/banyand/internal/sidx/scan_query.go +++ b/banyand/internal/sidx/scan_query.go @@ -109,8 +109,8 @@ func (s *sidx) ScanQuery(ctx context.Context, req ScanQueryRequest) ([]*QueryRes func (s *sidx) scanPart(ctx context.Context, pw *partWrapper, req ScanQueryRequest, minKey, maxKey int64, results *[]*QueryResponse, currentBatch **QueryResponse, - maxBatchSize int) error { - + maxBatchSize int, +) error { p := pw.p bma := generateBlockMetadataArray() defer releaseBlockMetadataArray(bma) @@ -191,9 +191,7 @@ func (s *sidx) scanPart(ctx context.Context, pw *partWrapper, req ScanQueryReque } // Add to current batch - if bc.copyTo(*currentBatch) { - // copyTo already appends to the batch - } + _ = bc.copyTo(*currentBatch) } } diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go index d9979108..b3df3a3c 100644 --- a/banyand/internal/sidx/sidx.go +++ b/banyand/internal/sidx/sidx.go @@ -515,7 +515,7 @@ func (bc *blockCursor) copyTo(result *QueryResponse) bool { return true } -// formatTagValue converts a Tag to a string representation +// formatTagValue converts a Tag to a string representation. func formatTagValue(tag Tag) string { if len(tag.ValueArr) > 0 { // Array of values diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go index 1be7149c..fdf141f1 100644 --- a/test/cases/trace/trace.go +++ b/test/cases/trace/trace.go @@ -54,7 +54,7 @@ var _ = g.DescribeTable("Scanning Traces", func(args helpers.Args) { g.Entry("filter by trace id and service unknown", helpers.Args{Input: "eq_trace_id_and_service_unknown", Duration: 1 * time.Hour, WantEmpty: true}), g.Entry("filter by query", helpers.Args{Input: "having_query_tag", Duration: 1 * time.Hour}), g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 * time.Hour, WantErr: true}), - g.FEntry("filter by query with having condition", helpers.Args{Input: "having_query_tag_cond", Want: "having_query_tag", Duration: 1 * time.Hour}), + g.Entry("filter by query with having condition", helpers.Args{Input: "having_query_tag_cond", Want: "having_query_tag", Duration: 1 * time.Hour}), g.Entry("multi-groups: unchanged tags", helpers.Args{Input: "multi_group_unchanged", Duration: 1 * time.Hour}), g.Entry("multi-groups: new tag", helpers.Args{Input: "multi_group_new_tag", Duration: 1 * time.Hour}), g.Entry("multi-groups: tag type change", helpers.Args{Input: "multi_group_tag_type", Duration: 1 * time.Hour}),
