Copilot commented on code in PR #864: URL: https://github.com/apache/skywalking-banyandb/pull/864#discussion_r2562560278
########## banyand/cmd/dump/measure.go: ########## @@ -0,0 +1,1258 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "encoding/csv" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + + "github.com/spf13/cobra" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" + "github.com/apache/skywalking-banyandb/pkg/bytes" + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" +) + +const ( + dirNameSidx = "sidx" + dirNameMeta = "meta" +) + +type measureDumpOptions struct { + shardPath string + segmentPath string + criteriaJSON string + projectionTags string + verbose bool + csvOutput bool +} + +func newMeasureCmd() *cobra.Command { + var shardPath string + var segmentPath string + var verbose bool + var csvOutput bool + var criteriaJSON string + var projectionTags string + + cmd := &cobra.Command{ + Use: "measure", + Short: "Dump measure shard data", + Long: `Dump and display contents of a measure shard directory (containing multiple parts). +Outputs measure data in human-readable format or CSV. + +Supports filtering by criteria and projecting specific tags.`, + Example: ` # Display measure data from shard in text format + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment + + # Display with verbose hex dumps + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment -v + + # Filter by criteria + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --criteria '{"condition":{"name":"query","op":"BINARY_OP_HAVING","value":{"strArray":{"value":["tag1=value1","tag2=value2"]}}}}' + + # Project specific tags + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --projection "tag1,tag2,tag3" + + # Output as CSV + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv + + # Save CSV to file + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv > output.csv`, + RunE: func(_ *cobra.Command, _ []string) error { + if shardPath == "" { + return fmt.Errorf("--shard-path flag is required") + } + if segmentPath == "" { + return fmt.Errorf("--segment-path flag is required") + } + return dumpMeasureShard(measureDumpOptions{ + shardPath: shardPath, + segmentPath: segmentPath, + verbose: verbose, + csvOutput: csvOutput, + criteriaJSON: criteriaJSON, + projectionTags: projectionTags, + }) + }, + } + + cmd.Flags().StringVar(&shardPath, "shard-path", "", "Path to the shard directory (required)") + cmd.Flags().StringVarP(&segmentPath, "segment-path", "g", "", "Path to the segment directory (required)") + cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output (show raw data)") + cmd.Flags().BoolVar(&csvOutput, "csv", false, "Output as CSV format") + cmd.Flags().StringVarP(&criteriaJSON, "criteria", "c", "", "Criteria filter as JSON string") + cmd.Flags().StringVarP(&projectionTags, "projection", "p", "", "Comma-separated list of tags to include as columns (e.g., tag1,tag2,tag3)") + _ = cmd.MarkFlagRequired("shard-path") + _ = cmd.MarkFlagRequired("segment-path") + + return cmd +} + +func dumpMeasureShard(opts measureDumpOptions) error { + ctx, err := newMeasureDumpContext(opts) + if err != nil || ctx == nil { + return err + } + defer ctx.close() + + if err := ctx.processParts(); err != nil { + return err + } + + ctx.printSummary() + return nil +} + +type measurePartMetadata struct { + CompressedSizeBytes uint64 `json:"compressedSizeBytes"` + UncompressedSizeBytes uint64 `json:"uncompressedSizeBytes"` + TotalCount uint64 `json:"totalCount"` + BlocksCount uint64 `json:"blocksCount"` + MinTimestamp int64 `json:"minTimestamp"` + MaxTimestamp int64 `json:"maxTimestamp"` + ID uint64 `json:"-"` +} + +type measurePrimaryBlockMetadata struct { + seriesID common.SeriesID + minTimestamp int64 + maxTimestamp int64 + offset uint64 + size uint64 +} + +type measureDataBlock struct { + offset uint64 + size uint64 +} + +type measureBlockMetadata struct { + tagFamilies map[string]*measureDataBlock + field measureColumnFamilyMetadata + timestamps measureTimestampsMetadata + seriesID common.SeriesID + uncompressedSizeBytes uint64 + count uint64 +} + +type measureTimestampsMetadata struct { + dataBlock measureDataBlock + min int64 + max int64 + versionOffset uint64 + versionFirst int64 + encodeType encoding.EncodeType + versionEncodeType encoding.EncodeType +} + +type measureColumnFamilyMetadata struct { + columns []measureColumnMetadata +} + +type measureColumnMetadata struct { + name string + dataBlock measureDataBlock + valueType pbv1.ValueType +} + +type measurePart struct { + primary fs.Reader + timestamps fs.Reader + fieldValues fs.Reader + fileSystem fs.FileSystem + tagFamilyMetadata map[string]fs.Reader + tagFamilies map[string]fs.Reader + path string + primaryBlockMetadata []measurePrimaryBlockMetadata + partMetadata measurePartMetadata +} + +type measureRowData struct { + tags map[string][]byte + fields map[string][]byte + fieldTypes map[string]pbv1.ValueType + timestamp int64 + version int64 + partID uint64 + seriesID common.SeriesID +} + +type measureDumpContext struct { + tagFilter logical.TagFilter + fileSystem fs.FileSystem + seriesMap map[common.SeriesID]string + writer *csv.Writer + opts measureDumpOptions + partIDs []uint64 + projectionTags []string + tagColumns []string + fieldColumns []string + rowNum int +} + +func newMeasureDumpContext(opts measureDumpOptions) (*measureDumpContext, error) { + ctx := &measureDumpContext{ + opts: opts, + fileSystem: fs.NewLocalFileSystem(), + } + + partIDs, err := discoverMeasurePartIDs(opts.shardPath) + if err != nil { + return nil, fmt.Errorf("failed to discover part IDs: %w", err) + } + if len(partIDs) == 0 { + fmt.Println("No parts found in shard directory") + return nil, nil + } + ctx.partIDs = partIDs + fmt.Fprintf(os.Stderr, "Found %d parts in shard\n", len(partIDs)) + + ctx.seriesMap, err = loadMeasureSeriesMap(opts.segmentPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Failed to load series information: %v\n", err) + ctx.seriesMap = nil + } else { + fmt.Fprintf(os.Stderr, "Loaded %d series from segment\n", len(ctx.seriesMap)) + } + + if opts.criteriaJSON != "" { + var criteria *modelv1.Criteria + criteria, err = parseMeasureCriteriaJSON(opts.criteriaJSON) + if err != nil { + return nil, fmt.Errorf("failed to parse criteria: %w", err) + } + ctx.tagFilter, err = logical.BuildSimpleTagFilter(criteria) + if err != nil { + return nil, fmt.Errorf("failed to build tag filter: %w", err) + } + fmt.Fprintf(os.Stderr, "Applied criteria filter\n") + } + + if opts.projectionTags != "" { + ctx.projectionTags = parseMeasureProjectionTags(opts.projectionTags) + fmt.Fprintf(os.Stderr, "Projection tags: %v\n", ctx.projectionTags) + } + + if opts.csvOutput { + if len(ctx.projectionTags) > 0 { + ctx.tagColumns = ctx.projectionTags + } else { + ctx.tagColumns, ctx.fieldColumns, err = discoverMeasureColumns(ctx.partIDs, opts.shardPath, ctx.fileSystem) + if err != nil { + return nil, fmt.Errorf("failed to discover columns: %w", err) + } + } + } + + if err := ctx.initOutput(); err != nil { + return nil, err + } + + return ctx, nil +} + +func (ctx *measureDumpContext) initOutput() error { + if !ctx.opts.csvOutput { + fmt.Printf("================================================================================\n") + fmt.Fprintf(os.Stderr, "Processing parts...\n") + return nil + } + + ctx.writer = csv.NewWriter(os.Stdout) + header := []string{"PartID", "Timestamp", "Version", "SeriesID", "Series"} + header = append(header, ctx.fieldColumns...) + header = append(header, ctx.tagColumns...) + if err := ctx.writer.Write(header); err != nil { + return fmt.Errorf("failed to write CSV header: %w", err) + } + return nil +} + +func (ctx *measureDumpContext) close() { + if ctx.writer != nil { + ctx.writer.Flush() + } +} + +func (ctx *measureDumpContext) processParts() error { + for partIdx, partID := range ctx.partIDs { + fmt.Fprintf(os.Stderr, "Processing part %d/%d (0x%016x)...\n", partIdx+1, len(ctx.partIDs), partID) + + p, err := openMeasurePart(partID, ctx.opts.shardPath, ctx.fileSystem) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to open part %016x: %v\n", partID, err) + continue + } + + partRowCount, partErr := ctx.processPart(partID, p) + closeMeasurePart(p) + if partErr != nil { + return partErr + } + + fmt.Fprintf(os.Stderr, " Part %d/%d: processed %d rows (total: %d)\n", partIdx+1, len(ctx.partIDs), partRowCount, ctx.rowNum) + } + return nil +} + +func (ctx *measureDumpContext) processPart(partID uint64, p *measurePart) (int, error) { + decoder := &encoding.BytesBlockDecoder{} + partRowCount := 0 + + for _, pbm := range p.primaryBlockMetadata { + primaryData := make([]byte, pbm.size) + fs.MustReadData(p.primary, int64(pbm.offset), primaryData) + + decompressed, err := zstd.Decompress(nil, primaryData) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error decompressing primary data in part %016x: %v\n", partID, err) + continue + } + + blockMetadatas, err := parseMeasureBlockMetadata(decompressed) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error parsing block metadata in part %016x: %v\n", partID, err) + continue + } + + for _, bm := range blockMetadatas { + rows, err := ctx.processBlock(partID, bm, p, decoder) + if err != nil { + return partRowCount, err + } + partRowCount += rows + } + } + + return partRowCount, nil +} + +func (ctx *measureDumpContext) processBlock(partID uint64, bm *measureBlockMetadata, p *measurePart, decoder *encoding.BytesBlockDecoder) (int, error) { + // Read timestamps and versions + timestamps, versions, err := readMeasureTimestamps(bm.timestamps, int(bm.count), p.timestamps) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error reading timestamps/versions for series %d in part %016x: %v\n", bm.seriesID, partID, err) + return 0, nil + } + + // Read field values + fieldsByDataPoint := ctx.readBlockFields(partID, bm, p, decoder) + + // Read tag families + tagsByDataPoint := ctx.readBlockTagFamilies(partID, bm, p, decoder) + + rows := 0 + for i := 0; i < len(timestamps); i++ { + dataPointTags := make(map[string][]byte) + for tagName, tagValues := range tagsByDataPoint { + if i < len(tagValues) { + dataPointTags[tagName] = tagValues[i] + } + } + + dataPointFields := make(map[string][]byte) + dataPointFieldTypes := make(map[string]pbv1.ValueType) + for fieldName, fieldValues := range fieldsByDataPoint { + if i < len(fieldValues) { + dataPointFields[fieldName] = fieldValues[i] + dataPointFieldTypes[fieldName] = bm.field.columns[0].valueType + } + } + + if ctx.shouldSkip(dataPointTags) { + continue + } + + row := measureRowData{ + partID: partID, + timestamp: timestamps[i], + version: versions[i], + tags: dataPointTags, + fields: dataPointFields, + fieldTypes: dataPointFieldTypes, + seriesID: bm.seriesID, + } + + if err := ctx.writeRow(row); err != nil { + return rows, err + } + + rows++ + } + + return rows, nil +} + +func (ctx *measureDumpContext) readBlockTagFamilies(partID uint64, bm *measureBlockMetadata, p *measurePart, decoder *encoding.BytesBlockDecoder) map[string][][]byte { + tags := make(map[string][][]byte) + for tagFamilyName, tagFamilyBlock := range bm.tagFamilies { + // Read tag family metadata + tagFamilyMetadataData := make([]byte, tagFamilyBlock.size) + fs.MustReadData(p.tagFamilyMetadata[tagFamilyName], int64(tagFamilyBlock.offset), tagFamilyMetadataData) + + // Parse tag family metadata as columnFamilyMetadata (same format as fields) + var cfm measureColumnFamilyMetadata + _, err := cfm.unmarshal(tagFamilyMetadataData) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error parsing tag family metadata %s for series %d in part %016x: %v\n", tagFamilyName, bm.seriesID, partID, err) + continue + } + + // Read each tag (column) in the tag family + for _, colMeta := range cfm.columns { + fullTagName := tagFamilyName + "." + colMeta.name + tagValues, err := readMeasureTagValues(decoder, colMeta.dataBlock, fullTagName, int(bm.count), p.tagFamilies[tagFamilyName], colMeta.valueType) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error reading tag %s for series %d in part %016x: %v\n", fullTagName, bm.seriesID, partID, err) + continue + } + tags[fullTagName] = tagValues + } + } + return tags +} + +func (ctx *measureDumpContext) readBlockFields(partID uint64, bm *measureBlockMetadata, p *measurePart, decoder *encoding.BytesBlockDecoder) map[string][][]byte { + fields := make(map[string][][]byte) + for _, colMeta := range bm.field.columns { + fieldValues, err := readMeasureFieldValues(decoder, colMeta.dataBlock, colMeta.name, int(bm.count), p.fieldValues, colMeta.valueType) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error reading field %s for series %d in part %016x: %v\n", colMeta.name, bm.seriesID, partID, err) + continue + } + fields[colMeta.name] = fieldValues + } + return fields +} + +func (ctx *measureDumpContext) shouldSkip(tags map[string][]byte) bool { + if ctx.tagFilter == nil || ctx.tagFilter == logical.DummyFilter { + return false + } + // Convert tags to modelv1.Tag format for filtering + modelTags := make([]*modelv1.Tag, 0, len(tags)) + for name, value := range tags { + if value == nil { + continue + } + tagValue := convertMeasureTagValue(value) + if tagValue != nil { + modelTags = append(modelTags, &modelv1.Tag{ + Key: name, + Value: tagValue, + }) + } + } + + // Create a simple registry for tag filtering + registry := &measureTagRegistry{ + tags: tags, + } + + matcher := logical.NewTagFilterMatcher(ctx.tagFilter, registry, measureTagValueDecoder) + match, _ := matcher.Match(modelTags) + return !match +} + +func (ctx *measureDumpContext) writeRow(row measureRowData) error { + if ctx.opts.csvOutput { + if err := writeMeasureRowAsCSV(ctx.writer, row, ctx.fieldColumns, ctx.tagColumns, ctx.seriesMap); err != nil { + return err + } + } else { + writeMeasureRowAsText(row, ctx.rowNum+1, ctx.projectionTags, ctx.seriesMap) + } + ctx.rowNum++ + return nil +} + +func (ctx *measureDumpContext) printSummary() { + if ctx.opts.csvOutput { + fmt.Fprintf(os.Stderr, "Total rows written: %d\n", ctx.rowNum) + return + } + fmt.Printf("\nTotal rows: %d\n", ctx.rowNum) +} + +func openMeasurePart(id uint64, root string, fileSystem fs.FileSystem) (*measurePart, error) { + var p measurePart + partPath := filepath.Join(root, fmt.Sprintf("%016x", id)) + p.path = partPath + p.fileSystem = fileSystem + + // Read metadata.json + metadataPath := filepath.Join(partPath, "metadata.json") + metadata, err := fileSystem.Read(metadataPath) + if err != nil { + return nil, fmt.Errorf("cannot read metadata.json: %w", err) + } + if unmarshalErr := json.Unmarshal(metadata, &p.partMetadata); unmarshalErr != nil { + return nil, fmt.Errorf("cannot parse metadata.json: %w", unmarshalErr) + } + p.partMetadata.ID = id + + // Read primary block metadata + metaPath := filepath.Join(partPath, "meta.bin") + metaFile, err := fileSystem.OpenFile(metaPath) + if err != nil { + return nil, fmt.Errorf("cannot open meta.bin: %w", err) + } + p.primaryBlockMetadata, err = readMeasurePrimaryBlockMetadata(metaFile) + fs.MustClose(metaFile) + if err != nil { + return nil, fmt.Errorf("cannot read primary block metadata: %w", err) + } + + // Open data files + p.primary, err = fileSystem.OpenFile(filepath.Join(partPath, "primary.bin")) + if err != nil { + return nil, fmt.Errorf("cannot open primary.bin: %w", err) + } + + p.timestamps, err = fileSystem.OpenFile(filepath.Join(partPath, "timestamps.bin")) + if err != nil { + fs.MustClose(p.primary) + return nil, fmt.Errorf("cannot open timestamps.bin: %w", err) + } + + p.fieldValues, err = fileSystem.OpenFile(filepath.Join(partPath, "fv.bin")) + if err != nil { + fs.MustClose(p.primary) + fs.MustClose(p.timestamps) + return nil, fmt.Errorf("cannot open fv.bin: %w", err) + } + + // Open tag family files + entries := fileSystem.ReadDir(partPath) + p.tagFamilies = make(map[string]fs.Reader) + p.tagFamilyMetadata = make(map[string]fs.Reader) + for _, e := range entries { + if e.IsDir() { + continue + } + name := e.Name() + if strings.HasSuffix(name, ".tfm") { + tagFamilyName := name[:len(name)-4] + reader, err := fileSystem.OpenFile(filepath.Join(partPath, name)) + if err == nil { + p.tagFamilyMetadata[tagFamilyName] = reader + } + } + if strings.HasSuffix(name, ".tf") { + tagFamilyName := name[:len(name)-3] + reader, err := fileSystem.OpenFile(filepath.Join(partPath, name)) + if err == nil { + p.tagFamilies[tagFamilyName] = reader + } + } + } + + return &p, nil +} + +func closeMeasurePart(p *measurePart) { + if p.primary != nil { + fs.MustClose(p.primary) + } + if p.timestamps != nil { + fs.MustClose(p.timestamps) + } + if p.fieldValues != nil { + fs.MustClose(p.fieldValues) + } + for _, r := range p.tagFamilies { + fs.MustClose(r) + } + for _, r := range p.tagFamilyMetadata { + fs.MustClose(r) + } +} + +func readMeasurePrimaryBlockMetadata(r fs.Reader) ([]measurePrimaryBlockMetadata, error) { + sr := r.SequentialRead() + data, err := io.ReadAll(sr) + if err != nil { + return nil, fmt.Errorf("cannot read: %w", err) + } + fs.MustClose(sr) + + decompressed, err := zstd.Decompress(nil, data) + if err != nil { + return nil, fmt.Errorf("cannot decompress: %w", err) + } + + var result []measurePrimaryBlockMetadata + src := decompressed + for len(src) > 0 { + var pbm measurePrimaryBlockMetadata + src, err = unmarshalMeasurePrimaryBlockMetadata(&pbm, src) + if err != nil { + return nil, err + } + result = append(result, pbm) + } + return result, nil +} + +func unmarshalMeasurePrimaryBlockMetadata(pbm *measurePrimaryBlockMetadata, src []byte) ([]byte, error) { + if len(src) < 40 { + return nil, fmt.Errorf("insufficient data") + } + pbm.seriesID = common.SeriesID(encoding.BytesToUint64(src)) + src = src[8:] + pbm.minTimestamp = int64(encoding.BytesToUint64(src)) + src = src[8:] + pbm.maxTimestamp = int64(encoding.BytesToUint64(src)) + src = src[8:] + pbm.offset = encoding.BytesToUint64(src) + src = src[8:] + pbm.size = encoding.BytesToUint64(src) + return src[8:], nil +} + +func parseMeasureBlockMetadata(src []byte) ([]*measureBlockMetadata, error) { + var result []*measureBlockMetadata + for len(src) > 0 { + bm, tail, err := unmarshalMeasureBlockMetadata(src) + if err != nil { + return nil, err + } + result = append(result, bm) + src = tail + } + return result, nil +} + +func unmarshalMeasureBlockMetadata(src []byte) (*measureBlockMetadata, []byte, error) { + var bm measureBlockMetadata + + if len(src) < 8 { + return nil, nil, fmt.Errorf("cannot unmarshal blockMetadata from less than 8 bytes") + } + bm.seriesID = common.SeriesID(encoding.BytesToUint64(src)) + src = src[8:] + + src, n := encoding.BytesToVarUint64(src) + bm.uncompressedSizeBytes = n + + src, n = encoding.BytesToVarUint64(src) + bm.count = n + + // Unmarshal timestamps metadata + src = bm.timestamps.unmarshal(src) + + // Unmarshal tag families + src, tagFamilyCount := encoding.BytesToVarUint64(src) + if tagFamilyCount > 0 { + bm.tagFamilies = make(map[string]*measureDataBlock) + for i := uint64(0); i < tagFamilyCount; i++ { + var nameBytes []byte + var err error + src, nameBytes, err = encoding.DecodeBytes(src) + if err != nil { + return nil, nil, fmt.Errorf("cannot unmarshal tagFamily name: %w", err) + } + tf := &measureDataBlock{} + src = tf.unmarshal(src) + bm.tagFamilies[string(nameBytes)] = tf + } + } + + // Unmarshal field (column family metadata) + var err error + src, err = bm.field.unmarshal(src) + if err != nil { + return nil, nil, fmt.Errorf("cannot unmarshal columnFamilyMetadata: %w", err) + } + + return &bm, src, nil +} + +func (tm *measureTimestampsMetadata) unmarshal(src []byte) []byte { + src = tm.dataBlock.unmarshal(src) + tm.min = int64(encoding.BytesToUint64(src)) + src = src[8:] + tm.max = int64(encoding.BytesToUint64(src)) + src = src[8:] + tm.encodeType = encoding.EncodeType(src[0]) + src = src[1:] + src, n := encoding.BytesToVarUint64(src) + tm.versionOffset = n + tm.versionFirst = int64(encoding.BytesToUint64(src)) + src = src[8:] + tm.versionEncodeType = encoding.EncodeType(src[0]) + return src[1:] +} + +func (db *measureDataBlock) unmarshal(src []byte) []byte { + src, n := encoding.BytesToVarUint64(src) + db.offset = n + src, n = encoding.BytesToVarUint64(src) + db.size = n + return src +} + +func (cfm *measureColumnFamilyMetadata) unmarshal(src []byte) ([]byte, error) { + src, columnMetadataLen := encoding.BytesToVarUint64(src) + if columnMetadataLen < 1 { + return src, nil + } + cfm.columns = make([]measureColumnMetadata, columnMetadataLen) + var err error + for i := range cfm.columns { + src, err = cfm.columns[i].unmarshal(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal columnMetadata %d: %w", i, err) + } + } + return src, nil +} + +func (cm *measureColumnMetadata) unmarshal(src []byte) ([]byte, error) { + src, nameBytes, err := encoding.DecodeBytes(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal columnMetadata.name: %w", err) + } + cm.name = string(nameBytes) + if len(src) < 1 { + return nil, fmt.Errorf("cannot unmarshal columnMetadata.valueType: src is too short") + } + cm.valueType = pbv1.ValueType(src[0]) + src = src[1:] + src = cm.dataBlock.unmarshal(src) + return src, nil +} + +func readMeasureTimestamps(tm measureTimestampsMetadata, count int, reader fs.Reader) ([]int64, []int64, error) { + data := make([]byte, tm.dataBlock.size) + fs.MustReadData(reader, int64(tm.dataBlock.offset), data) + + if tm.dataBlock.size < tm.versionOffset { + return nil, nil, fmt.Errorf("size %d must be greater than versionOffset %d", tm.dataBlock.size, tm.versionOffset) + } + + // Get the common type from the version type (similar to mustDecodeTimestampsWithVersions) + commonEncodeType := encoding.GetCommonType(tm.encodeType) + if commonEncodeType == encoding.EncodeTypeUnknown { + return nil, nil, fmt.Errorf("unexpected encodeType %d", tm.encodeType) + } + + // Decode timestamps (first part of the data) + var timestamps []int64 + var err error + timestamps, err = encoding.BytesToInt64List(timestamps, data[:tm.versionOffset], commonEncodeType, tm.min, count) + if err != nil { + return nil, nil, fmt.Errorf("cannot decode timestamps: %w", err) + } + + // Decode versions (second part of the data, starting from versionOffset) + var versions []int64 + versions, err = encoding.BytesToInt64List(versions, data[tm.versionOffset:], tm.versionEncodeType, tm.versionFirst, count) + if err != nil { + return nil, nil, fmt.Errorf("cannot decode versions: %w", err) + } + + return timestamps, versions, nil +} + +func readMeasureTagValues(decoder *encoding.BytesBlockDecoder, tagBlock measureDataBlock, _ string, count int, + valueReader fs.Reader, valueType pbv1.ValueType, +) ([][]byte, error) { + // Read tag values + bb := &bytes.Buffer{} + bb.Buf = make([]byte, tagBlock.size) + fs.MustReadData(valueReader, int64(tagBlock.offset), bb.Buf) + + // Decode values using the internal encoding package + var err error + var values [][]byte + values, err = internalencoding.DecodeTagValues(values, decoder, bb, valueType, count) + if err != nil { + return nil, fmt.Errorf("cannot decode tag values: %w", err) + } + + return values, nil +} + +func readMeasureFieldValues(decoder *encoding.BytesBlockDecoder, fieldBlock measureDataBlock, _ string, count int, + valueReader fs.Reader, valueType pbv1.ValueType, +) ([][]byte, error) { + // Read field values + bb := &bytes.Buffer{} + bb.Buf = make([]byte, fieldBlock.size) + fs.MustReadData(valueReader, int64(fieldBlock.offset), bb.Buf) + + // Decode values based on value type + var values [][]byte + var err error + + switch valueType { + case pbv1.ValueTypeInt64: + // Decode int64 values - similar to column.decodeInt64Column + if len(bb.Buf) < 1 { + return nil, fmt.Errorf("buffer too short for int64 field") + } + encodeType := encoding.EncodeType(bb.Buf[0]) + if encodeType == encoding.EncodeTypePlain { + // Use default decoder for plain encoding + bb.Buf = bb.Buf[1:] + values, err = decoder.Decode(values[:0], bb.Buf, uint64(count)) + if err != nil { + return nil, fmt.Errorf("cannot decode int64 field values (plain): %w", err) + } + } else { + const expectedLen = 9 + if len(bb.Buf) < expectedLen { + return nil, fmt.Errorf("buffer too short for int64 field: expected at least %d bytes", expectedLen) + } + firstValue := convert.BytesToInt64(bb.Buf[1:9]) + bb.Buf = bb.Buf[9:] + intValues := make([]int64, count) + intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, encodeType, firstValue, count) + if err != nil { + return nil, fmt.Errorf("cannot decode int64 field values: %w", err) + } + values = make([][]byte, count) + for i, v := range intValues { + values[i] = convert.Int64ToBytes(v) + } + } + case pbv1.ValueTypeFloat64: + // Decode float64 values - similar to column.decodeFloat64Column + if len(bb.Buf) < 1 { + return nil, fmt.Errorf("buffer too short for float64 field") + } + encodeType := encoding.EncodeType(bb.Buf[0]) + if encodeType == encoding.EncodeTypePlain { + // Use default decoder for plain encoding + bb.Buf = bb.Buf[1:] + values, err = decoder.Decode(values[:0], bb.Buf, uint64(count)) + if err != nil { + return nil, fmt.Errorf("cannot decode float64 field values (plain): %w", err) + } + } else { + const expectedLen = 11 + if len(bb.Buf) < expectedLen { + return nil, fmt.Errorf("buffer too short for float64 field: expected at least %d bytes", expectedLen) + } + exp := convert.BytesToInt16(bb.Buf[1:3]) + firstValue := convert.BytesToInt64(bb.Buf[3:11]) + bb.Buf = bb.Buf[11:] + intValues := make([]int64, count) + intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, encodeType, firstValue, count) + if err != nil { + return nil, fmt.Errorf("cannot decode int values for float64: %w", err) + } + floatValues := make([]float64, count) + floatValues, err = encoding.DecimalIntListToFloat64List(floatValues[:0], intValues, exp, count) + if err != nil { + return nil, fmt.Errorf("cannot convert DecimalIntList to Float64List: %w", err) + } + values = make([][]byte, count) + for i, v := range floatValues { + values[i] = convert.Float64ToBytes(v) + } + } + default: + // Use default decoder for other types + values, err = internalencoding.DecodeTagValues(values, decoder, bb, valueType, count) + if err != nil { + return nil, fmt.Errorf("cannot decode field values: %w", err) + } + } + + return values, nil +} + +func discoverMeasurePartIDs(shardPath string) ([]uint64, error) { + entries, err := os.ReadDir(shardPath) + if err != nil { + return nil, fmt.Errorf("failed to read shard directory: %w", err) + } + + var partIDs []uint64 + for _, entry := range entries { + if !entry.IsDir() { + continue + } + name := entry.Name() + if name == dirNameSidx || name == dirNameMeta { + continue + } + partID, err := strconv.ParseUint(name, 16, 64) + if err == nil { + partIDs = append(partIDs, partID) + } + } + + sort.Slice(partIDs, func(i, j int) bool { + return partIDs[i] < partIDs[j] + }) + + return partIDs, nil +} + +func loadMeasureSeriesMap(segmentPath string) (map[common.SeriesID]string, error) { + seriesIndexPath := filepath.Join(segmentPath, dirNameSidx) + + l := logger.GetLogger("dump-measure") + + store, err := inverted.NewStore(inverted.StoreOpts{ + Path: seriesIndexPath, + Logger: l, + }) + if err != nil { + return nil, fmt.Errorf("failed to open series index: %w", err) + } + defer store.Close() + + ctx := context.Background() + iter, err := store.SeriesIterator(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create series iterator: %w", err) + } + defer iter.Close() + + seriesMap := make(map[common.SeriesID]string) + for iter.Next() { + series := iter.Val() + if len(series.EntityValues) > 0 { + seriesID := common.SeriesID(convert.Hash(series.EntityValues)) + seriesText := string(series.EntityValues) + seriesMap[seriesID] = seriesText + } + } + + return seriesMap, nil +} + +func parseMeasureCriteriaJSON(criteriaJSON string) (*modelv1.Criteria, error) { + criteria := &modelv1.Criteria{} + err := protojson.Unmarshal([]byte(criteriaJSON), criteria) + if err != nil { + return nil, fmt.Errorf("invalid criteria JSON: %w", err) + } + return criteria, nil +} + +func parseMeasureProjectionTags(projectionStr string) []string { + if projectionStr == "" { + return nil + } + + tags := strings.Split(projectionStr, ",") + result := make([]string, 0, len(tags)) + for _, tag := range tags { + tag = strings.TrimSpace(tag) + if tag != "" { + result = append(result, tag) + } + } + return result +} + +func discoverMeasureColumns(partIDs []uint64, shardPath string, fileSystem fs.FileSystem) ([]string, []string, error) { + if len(partIDs) == 0 { + return nil, nil, nil + } + + p, err := openMeasurePart(partIDs[0], shardPath, fileSystem) + if err != nil { + return nil, nil, fmt.Errorf("failed to open first part: %w", err) + } + defer closeMeasurePart(p) + + tagNames := make(map[string]bool) + fieldNames := make(map[string]bool) + partID := partIDs[0] + for tagFamilyName := range p.tagFamilies { + // Read tag family metadata to get tag names + if tagFamilyMetadataReader, ok := p.tagFamilyMetadata[tagFamilyName]; ok { + metaData, err := io.ReadAll(tagFamilyMetadataReader.SequentialRead()) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error reading tag family metadata %s in part %016x: %v\n", tagFamilyName, partID, err) + continue + } + var cfm measureColumnFamilyMetadata + _, err = cfm.unmarshal(metaData) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error parsing tag family metadata %s in part %016x: %v\n", tagFamilyName, partID, err) + continue + } + for _, colMeta := range cfm.columns { + fullTagName := tagFamilyName + "." + colMeta.name + tagNames[fullTagName] = true + } + } + } + + // Read primary block to discover field names + if len(p.primaryBlockMetadata) > 0 { + primaryData := make([]byte, p.primaryBlockMetadata[0].size) + fs.MustReadData(p.primary, int64(p.primaryBlockMetadata[0].offset), primaryData) + + decompressed, err := zstd.Decompress(nil, primaryData) + if err == nil { + blockMetadatas, err := parseMeasureBlockMetadata(decompressed) + if err == nil && len(blockMetadatas) > 0 { + for _, colMeta := range blockMetadatas[0].field.columns { + fieldNames[colMeta.name] = true + } + } + } + } + + tagResult := make([]string, 0, len(tagNames)) + for name := range tagNames { + tagResult = append(tagResult, name) + } + sort.Strings(tagResult) + + fieldResult := make([]string, 0, len(fieldNames)) + for name := range fieldNames { + fieldResult = append(fieldResult, name) + } + sort.Strings(fieldResult) + + return tagResult, fieldResult, nil +} + +func writeMeasureRowAsText(row measureRowData, rowNum int, projectionTags []string, seriesMap map[common.SeriesID]string) { + fmt.Printf("Row %d:\n", rowNum) + fmt.Printf(" PartID: %d (0x%016x)\n", row.partID, row.partID) + fmt.Printf(" Timestamp: %s\n", formatTimestamp(row.timestamp)) Review Comment: The function `formatTimestamp` is called but not defined in this file. This will cause a compilation error. The function needs to be either: 1. Defined in this file (can be copied from trace.go), or 2. Extracted to a shared utility file Note: The `time` package also needs to be imported. ########## banyand/cmd/dump/measure.go: ########## @@ -0,0 +1,1258 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "encoding/csv" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + + "github.com/spf13/cobra" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" + "github.com/apache/skywalking-banyandb/pkg/bytes" + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" +) + +const ( + dirNameSidx = "sidx" + dirNameMeta = "meta" +) + +type measureDumpOptions struct { + shardPath string + segmentPath string + criteriaJSON string + projectionTags string + verbose bool + csvOutput bool +} + +func newMeasureCmd() *cobra.Command { + var shardPath string + var segmentPath string + var verbose bool + var csvOutput bool + var criteriaJSON string + var projectionTags string + + cmd := &cobra.Command{ + Use: "measure", + Short: "Dump measure shard data", + Long: `Dump and display contents of a measure shard directory (containing multiple parts). +Outputs measure data in human-readable format or CSV. + +Supports filtering by criteria and projecting specific tags.`, + Example: ` # Display measure data from shard in text format + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment + + # Display with verbose hex dumps + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment -v + + # Filter by criteria + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --criteria '{"condition":{"name":"query","op":"BINARY_OP_HAVING","value":{"strArray":{"value":["tag1=value1","tag2=value2"]}}}}' + + # Project specific tags + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --projection "tag1,tag2,tag3" + + # Output as CSV + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv + + # Save CSV to file + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv > output.csv`, + RunE: func(_ *cobra.Command, _ []string) error { + if shardPath == "" { + return fmt.Errorf("--shard-path flag is required") + } + if segmentPath == "" { + return fmt.Errorf("--segment-path flag is required") + } + return dumpMeasureShard(measureDumpOptions{ + shardPath: shardPath, + segmentPath: segmentPath, + verbose: verbose, + csvOutput: csvOutput, + criteriaJSON: criteriaJSON, + projectionTags: projectionTags, + }) + }, + } + + cmd.Flags().StringVar(&shardPath, "shard-path", "", "Path to the shard directory (required)") + cmd.Flags().StringVarP(&segmentPath, "segment-path", "g", "", "Path to the segment directory (required)") + cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output (show raw data)") + cmd.Flags().BoolVar(&csvOutput, "csv", false, "Output as CSV format") + cmd.Flags().StringVarP(&criteriaJSON, "criteria", "c", "", "Criteria filter as JSON string") + cmd.Flags().StringVarP(&projectionTags, "projection", "p", "", "Comma-separated list of tags to include as columns (e.g., tag1,tag2,tag3)") + _ = cmd.MarkFlagRequired("shard-path") + _ = cmd.MarkFlagRequired("segment-path") + + return cmd +} + +func dumpMeasureShard(opts measureDumpOptions) error { + ctx, err := newMeasureDumpContext(opts) + if err != nil || ctx == nil { + return err + } + defer ctx.close() + + if err := ctx.processParts(); err != nil { + return err + } + + ctx.printSummary() + return nil +} + +type measurePartMetadata struct { + CompressedSizeBytes uint64 `json:"compressedSizeBytes"` + UncompressedSizeBytes uint64 `json:"uncompressedSizeBytes"` + TotalCount uint64 `json:"totalCount"` + BlocksCount uint64 `json:"blocksCount"` + MinTimestamp int64 `json:"minTimestamp"` + MaxTimestamp int64 `json:"maxTimestamp"` + ID uint64 `json:"-"` +} + +type measurePrimaryBlockMetadata struct { + seriesID common.SeriesID + minTimestamp int64 + maxTimestamp int64 + offset uint64 + size uint64 +} + +type measureDataBlock struct { + offset uint64 + size uint64 +} + +type measureBlockMetadata struct { + tagFamilies map[string]*measureDataBlock + field measureColumnFamilyMetadata + timestamps measureTimestampsMetadata + seriesID common.SeriesID + uncompressedSizeBytes uint64 + count uint64 +} + +type measureTimestampsMetadata struct { + dataBlock measureDataBlock + min int64 + max int64 + versionOffset uint64 + versionFirst int64 + encodeType encoding.EncodeType + versionEncodeType encoding.EncodeType +} + +type measureColumnFamilyMetadata struct { + columns []measureColumnMetadata +} + +type measureColumnMetadata struct { + name string + dataBlock measureDataBlock + valueType pbv1.ValueType +} + +type measurePart struct { + primary fs.Reader + timestamps fs.Reader + fieldValues fs.Reader + fileSystem fs.FileSystem + tagFamilyMetadata map[string]fs.Reader + tagFamilies map[string]fs.Reader + path string + primaryBlockMetadata []measurePrimaryBlockMetadata + partMetadata measurePartMetadata +} + +type measureRowData struct { + tags map[string][]byte + fields map[string][]byte + fieldTypes map[string]pbv1.ValueType + timestamp int64 + version int64 + partID uint64 + seriesID common.SeriesID +} + +type measureDumpContext struct { + tagFilter logical.TagFilter + fileSystem fs.FileSystem + seriesMap map[common.SeriesID]string + writer *csv.Writer + opts measureDumpOptions + partIDs []uint64 + projectionTags []string + tagColumns []string + fieldColumns []string + rowNum int +} + +func newMeasureDumpContext(opts measureDumpOptions) (*measureDumpContext, error) { + ctx := &measureDumpContext{ + opts: opts, + fileSystem: fs.NewLocalFileSystem(), + } + + partIDs, err := discoverMeasurePartIDs(opts.shardPath) + if err != nil { + return nil, fmt.Errorf("failed to discover part IDs: %w", err) + } + if len(partIDs) == 0 { + fmt.Println("No parts found in shard directory") + return nil, nil + } + ctx.partIDs = partIDs + fmt.Fprintf(os.Stderr, "Found %d parts in shard\n", len(partIDs)) + + ctx.seriesMap, err = loadMeasureSeriesMap(opts.segmentPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Failed to load series information: %v\n", err) + ctx.seriesMap = nil + } else { + fmt.Fprintf(os.Stderr, "Loaded %d series from segment\n", len(ctx.seriesMap)) + } + + if opts.criteriaJSON != "" { + var criteria *modelv1.Criteria + criteria, err = parseMeasureCriteriaJSON(opts.criteriaJSON) + if err != nil { + return nil, fmt.Errorf("failed to parse criteria: %w", err) + } + ctx.tagFilter, err = logical.BuildSimpleTagFilter(criteria) + if err != nil { + return nil, fmt.Errorf("failed to build tag filter: %w", err) + } + fmt.Fprintf(os.Stderr, "Applied criteria filter\n") + } + + if opts.projectionTags != "" { + ctx.projectionTags = parseMeasureProjectionTags(opts.projectionTags) + fmt.Fprintf(os.Stderr, "Projection tags: %v\n", ctx.projectionTags) + } + + if opts.csvOutput { + if len(ctx.projectionTags) > 0 { + ctx.tagColumns = ctx.projectionTags + } else { + ctx.tagColumns, ctx.fieldColumns, err = discoverMeasureColumns(ctx.partIDs, opts.shardPath, ctx.fileSystem) + if err != nil { + return nil, fmt.Errorf("failed to discover columns: %w", err) + } + } + } + + if err := ctx.initOutput(); err != nil { + return nil, err + } + + return ctx, nil +} + +func (ctx *measureDumpContext) initOutput() error { + if !ctx.opts.csvOutput { + fmt.Printf("================================================================================\n") + fmt.Fprintf(os.Stderr, "Processing parts...\n") + return nil + } + + ctx.writer = csv.NewWriter(os.Stdout) + header := []string{"PartID", "Timestamp", "Version", "SeriesID", "Series"} + header = append(header, ctx.fieldColumns...) + header = append(header, ctx.tagColumns...) + if err := ctx.writer.Write(header); err != nil { + return fmt.Errorf("failed to write CSV header: %w", err) + } + return nil +} + +func (ctx *measureDumpContext) close() { + if ctx.writer != nil { + ctx.writer.Flush() + } +} + +func (ctx *measureDumpContext) processParts() error { + for partIdx, partID := range ctx.partIDs { + fmt.Fprintf(os.Stderr, "Processing part %d/%d (0x%016x)...\n", partIdx+1, len(ctx.partIDs), partID) + + p, err := openMeasurePart(partID, ctx.opts.shardPath, ctx.fileSystem) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to open part %016x: %v\n", partID, err) + continue + } + + partRowCount, partErr := ctx.processPart(partID, p) + closeMeasurePart(p) + if partErr != nil { + return partErr + } + + fmt.Fprintf(os.Stderr, " Part %d/%d: processed %d rows (total: %d)\n", partIdx+1, len(ctx.partIDs), partRowCount, ctx.rowNum) + } + return nil +} + +func (ctx *measureDumpContext) processPart(partID uint64, p *measurePart) (int, error) { + decoder := &encoding.BytesBlockDecoder{} + partRowCount := 0 + + for _, pbm := range p.primaryBlockMetadata { + primaryData := make([]byte, pbm.size) + fs.MustReadData(p.primary, int64(pbm.offset), primaryData) + + decompressed, err := zstd.Decompress(nil, primaryData) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error decompressing primary data in part %016x: %v\n", partID, err) + continue + } + + blockMetadatas, err := parseMeasureBlockMetadata(decompressed) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error parsing block metadata in part %016x: %v\n", partID, err) + continue + } + + for _, bm := range blockMetadatas { + rows, err := ctx.processBlock(partID, bm, p, decoder) + if err != nil { + return partRowCount, err + } + partRowCount += rows + } + } + + return partRowCount, nil +} + +func (ctx *measureDumpContext) processBlock(partID uint64, bm *measureBlockMetadata, p *measurePart, decoder *encoding.BytesBlockDecoder) (int, error) { + // Read timestamps and versions + timestamps, versions, err := readMeasureTimestamps(bm.timestamps, int(bm.count), p.timestamps) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error reading timestamps/versions for series %d in part %016x: %v\n", bm.seriesID, partID, err) + return 0, nil + } + + // Read field values + fieldsByDataPoint := ctx.readBlockFields(partID, bm, p, decoder) + + // Read tag families + tagsByDataPoint := ctx.readBlockTagFamilies(partID, bm, p, decoder) + + rows := 0 + for i := 0; i < len(timestamps); i++ { + dataPointTags := make(map[string][]byte) + for tagName, tagValues := range tagsByDataPoint { + if i < len(tagValues) { + dataPointTags[tagName] = tagValues[i] + } + } + + dataPointFields := make(map[string][]byte) + dataPointFieldTypes := make(map[string]pbv1.ValueType) + for fieldName, fieldValues := range fieldsByDataPoint { + if i < len(fieldValues) { + dataPointFields[fieldName] = fieldValues[i] + dataPointFieldTypes[fieldName] = bm.field.columns[0].valueType Review Comment: Bug: Incorrect field type assignment. This code always uses `bm.field.columns[0].valueType` for all fields, which will assign the wrong type when there are multiple fields with different types. The code should look up the correct column metadata by field name. For example: ```go for fieldName, fieldValues := range fieldsByDataPoint { if i < len(fieldValues) { dataPointFields[fieldName] = fieldValues[i] // Find the correct column metadata for this field for _, col := range bm.field.columns { if col.name == fieldName { dataPointFieldTypes[fieldName] = col.valueType break } } } } ``` ```suggestion // Find the correct column metadata for this field for _, col := range bm.field.columns { if col.name == fieldName { dataPointFieldTypes[fieldName] = col.valueType break } } ``` ########## banyand/cmd/dump/measure.go: ########## @@ -0,0 +1,1258 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "encoding/csv" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + + "github.com/spf13/cobra" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" + "github.com/apache/skywalking-banyandb/pkg/bytes" + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" +) + +const ( + dirNameSidx = "sidx" + dirNameMeta = "meta" +) + +type measureDumpOptions struct { + shardPath string + segmentPath string + criteriaJSON string + projectionTags string + verbose bool + csvOutput bool +} + +func newMeasureCmd() *cobra.Command { + var shardPath string + var segmentPath string + var verbose bool + var csvOutput bool + var criteriaJSON string + var projectionTags string + + cmd := &cobra.Command{ + Use: "measure", + Short: "Dump measure shard data", + Long: `Dump and display contents of a measure shard directory (containing multiple parts). +Outputs measure data in human-readable format or CSV. + +Supports filtering by criteria and projecting specific tags.`, + Example: ` # Display measure data from shard in text format + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment + + # Display with verbose hex dumps + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment -v + + # Filter by criteria + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --criteria '{"condition":{"name":"query","op":"BINARY_OP_HAVING","value":{"strArray":{"value":["tag1=value1","tag2=value2"]}}}}' + + # Project specific tags + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --projection "tag1,tag2,tag3" + + # Output as CSV + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv + + # Save CSV to file + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv > output.csv`, + RunE: func(_ *cobra.Command, _ []string) error { + if shardPath == "" { + return fmt.Errorf("--shard-path flag is required") + } + if segmentPath == "" { + return fmt.Errorf("--segment-path flag is required") + } + return dumpMeasureShard(measureDumpOptions{ + shardPath: shardPath, + segmentPath: segmentPath, + verbose: verbose, + csvOutput: csvOutput, + criteriaJSON: criteriaJSON, + projectionTags: projectionTags, + }) + }, + } + + cmd.Flags().StringVar(&shardPath, "shard-path", "", "Path to the shard directory (required)") + cmd.Flags().StringVarP(&segmentPath, "segment-path", "g", "", "Path to the segment directory (required)") + cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output (show raw data)") + cmd.Flags().BoolVar(&csvOutput, "csv", false, "Output as CSV format") + cmd.Flags().StringVarP(&criteriaJSON, "criteria", "c", "", "Criteria filter as JSON string") + cmd.Flags().StringVarP(&projectionTags, "projection", "p", "", "Comma-separated list of tags to include as columns (e.g., tag1,tag2,tag3)") + _ = cmd.MarkFlagRequired("shard-path") + _ = cmd.MarkFlagRequired("segment-path") + + return cmd +} + +func dumpMeasureShard(opts measureDumpOptions) error { + ctx, err := newMeasureDumpContext(opts) + if err != nil || ctx == nil { + return err + } + defer ctx.close() + + if err := ctx.processParts(); err != nil { + return err + } + + ctx.printSummary() + return nil +} + +type measurePartMetadata struct { + CompressedSizeBytes uint64 `json:"compressedSizeBytes"` + UncompressedSizeBytes uint64 `json:"uncompressedSizeBytes"` + TotalCount uint64 `json:"totalCount"` + BlocksCount uint64 `json:"blocksCount"` + MinTimestamp int64 `json:"minTimestamp"` + MaxTimestamp int64 `json:"maxTimestamp"` + ID uint64 `json:"-"` +} + +type measurePrimaryBlockMetadata struct { + seriesID common.SeriesID + minTimestamp int64 + maxTimestamp int64 + offset uint64 + size uint64 +} + +type measureDataBlock struct { + offset uint64 + size uint64 +} + +type measureBlockMetadata struct { + tagFamilies map[string]*measureDataBlock + field measureColumnFamilyMetadata + timestamps measureTimestampsMetadata + seriesID common.SeriesID + uncompressedSizeBytes uint64 + count uint64 +} + +type measureTimestampsMetadata struct { + dataBlock measureDataBlock + min int64 + max int64 + versionOffset uint64 + versionFirst int64 + encodeType encoding.EncodeType + versionEncodeType encoding.EncodeType +} + +type measureColumnFamilyMetadata struct { + columns []measureColumnMetadata +} + +type measureColumnMetadata struct { + name string + dataBlock measureDataBlock + valueType pbv1.ValueType +} + +type measurePart struct { + primary fs.Reader + timestamps fs.Reader + fieldValues fs.Reader + fileSystem fs.FileSystem + tagFamilyMetadata map[string]fs.Reader + tagFamilies map[string]fs.Reader + path string + primaryBlockMetadata []measurePrimaryBlockMetadata + partMetadata measurePartMetadata +} + +type measureRowData struct { + tags map[string][]byte + fields map[string][]byte + fieldTypes map[string]pbv1.ValueType + timestamp int64 + version int64 + partID uint64 + seriesID common.SeriesID +} + +type measureDumpContext struct { + tagFilter logical.TagFilter + fileSystem fs.FileSystem + seriesMap map[common.SeriesID]string + writer *csv.Writer + opts measureDumpOptions + partIDs []uint64 + projectionTags []string + tagColumns []string + fieldColumns []string + rowNum int +} + +func newMeasureDumpContext(opts measureDumpOptions) (*measureDumpContext, error) { + ctx := &measureDumpContext{ + opts: opts, + fileSystem: fs.NewLocalFileSystem(), + } + + partIDs, err := discoverMeasurePartIDs(opts.shardPath) + if err != nil { + return nil, fmt.Errorf("failed to discover part IDs: %w", err) + } + if len(partIDs) == 0 { + fmt.Println("No parts found in shard directory") + return nil, nil + } + ctx.partIDs = partIDs + fmt.Fprintf(os.Stderr, "Found %d parts in shard\n", len(partIDs)) + + ctx.seriesMap, err = loadMeasureSeriesMap(opts.segmentPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Failed to load series information: %v\n", err) + ctx.seriesMap = nil + } else { + fmt.Fprintf(os.Stderr, "Loaded %d series from segment\n", len(ctx.seriesMap)) + } + + if opts.criteriaJSON != "" { + var criteria *modelv1.Criteria + criteria, err = parseMeasureCriteriaJSON(opts.criteriaJSON) + if err != nil { + return nil, fmt.Errorf("failed to parse criteria: %w", err) + } + ctx.tagFilter, err = logical.BuildSimpleTagFilter(criteria) + if err != nil { + return nil, fmt.Errorf("failed to build tag filter: %w", err) + } + fmt.Fprintf(os.Stderr, "Applied criteria filter\n") + } + + if opts.projectionTags != "" { + ctx.projectionTags = parseMeasureProjectionTags(opts.projectionTags) + fmt.Fprintf(os.Stderr, "Projection tags: %v\n", ctx.projectionTags) + } + + if opts.csvOutput { + if len(ctx.projectionTags) > 0 { + ctx.tagColumns = ctx.projectionTags + } else { + ctx.tagColumns, ctx.fieldColumns, err = discoverMeasureColumns(ctx.partIDs, opts.shardPath, ctx.fileSystem) + if err != nil { + return nil, fmt.Errorf("failed to discover columns: %w", err) + } + } + } + + if err := ctx.initOutput(); err != nil { + return nil, err + } + + return ctx, nil +} + +func (ctx *measureDumpContext) initOutput() error { + if !ctx.opts.csvOutput { + fmt.Printf("================================================================================\n") + fmt.Fprintf(os.Stderr, "Processing parts...\n") + return nil + } + + ctx.writer = csv.NewWriter(os.Stdout) + header := []string{"PartID", "Timestamp", "Version", "SeriesID", "Series"} + header = append(header, ctx.fieldColumns...) + header = append(header, ctx.tagColumns...) + if err := ctx.writer.Write(header); err != nil { + return fmt.Errorf("failed to write CSV header: %w", err) + } + return nil +} + +func (ctx *measureDumpContext) close() { + if ctx.writer != nil { + ctx.writer.Flush() + } +} + +func (ctx *measureDumpContext) processParts() error { + for partIdx, partID := range ctx.partIDs { + fmt.Fprintf(os.Stderr, "Processing part %d/%d (0x%016x)...\n", partIdx+1, len(ctx.partIDs), partID) + + p, err := openMeasurePart(partID, ctx.opts.shardPath, ctx.fileSystem) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to open part %016x: %v\n", partID, err) + continue + } + + partRowCount, partErr := ctx.processPart(partID, p) + closeMeasurePart(p) + if partErr != nil { + return partErr + } + + fmt.Fprintf(os.Stderr, " Part %d/%d: processed %d rows (total: %d)\n", partIdx+1, len(ctx.partIDs), partRowCount, ctx.rowNum) + } + return nil +} + +func (ctx *measureDumpContext) processPart(partID uint64, p *measurePart) (int, error) { + decoder := &encoding.BytesBlockDecoder{} + partRowCount := 0 + + for _, pbm := range p.primaryBlockMetadata { + primaryData := make([]byte, pbm.size) + fs.MustReadData(p.primary, int64(pbm.offset), primaryData) + + decompressed, err := zstd.Decompress(nil, primaryData) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error decompressing primary data in part %016x: %v\n", partID, err) + continue + } + + blockMetadatas, err := parseMeasureBlockMetadata(decompressed) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error parsing block metadata in part %016x: %v\n", partID, err) + continue + } + + for _, bm := range blockMetadatas { + rows, err := ctx.processBlock(partID, bm, p, decoder) + if err != nil { + return partRowCount, err + } + partRowCount += rows + } + } + + return partRowCount, nil +} + +func (ctx *measureDumpContext) processBlock(partID uint64, bm *measureBlockMetadata, p *measurePart, decoder *encoding.BytesBlockDecoder) (int, error) { + // Read timestamps and versions + timestamps, versions, err := readMeasureTimestamps(bm.timestamps, int(bm.count), p.timestamps) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error reading timestamps/versions for series %d in part %016x: %v\n", bm.seriesID, partID, err) + return 0, nil + } + + // Read field values + fieldsByDataPoint := ctx.readBlockFields(partID, bm, p, decoder) + + // Read tag families + tagsByDataPoint := ctx.readBlockTagFamilies(partID, bm, p, decoder) + + rows := 0 + for i := 0; i < len(timestamps); i++ { + dataPointTags := make(map[string][]byte) + for tagName, tagValues := range tagsByDataPoint { + if i < len(tagValues) { + dataPointTags[tagName] = tagValues[i] + } + } + + dataPointFields := make(map[string][]byte) + dataPointFieldTypes := make(map[string]pbv1.ValueType) + for fieldName, fieldValues := range fieldsByDataPoint { + if i < len(fieldValues) { + dataPointFields[fieldName] = fieldValues[i] + dataPointFieldTypes[fieldName] = bm.field.columns[0].valueType + } + } + + if ctx.shouldSkip(dataPointTags) { + continue + } + + row := measureRowData{ + partID: partID, + timestamp: timestamps[i], + version: versions[i], + tags: dataPointTags, + fields: dataPointFields, + fieldTypes: dataPointFieldTypes, + seriesID: bm.seriesID, + } + + if err := ctx.writeRow(row); err != nil { + return rows, err + } + + rows++ + } + + return rows, nil +} + +func (ctx *measureDumpContext) readBlockTagFamilies(partID uint64, bm *measureBlockMetadata, p *measurePart, decoder *encoding.BytesBlockDecoder) map[string][][]byte { + tags := make(map[string][][]byte) + for tagFamilyName, tagFamilyBlock := range bm.tagFamilies { + // Read tag family metadata + tagFamilyMetadataData := make([]byte, tagFamilyBlock.size) + fs.MustReadData(p.tagFamilyMetadata[tagFamilyName], int64(tagFamilyBlock.offset), tagFamilyMetadataData) + + // Parse tag family metadata as columnFamilyMetadata (same format as fields) + var cfm measureColumnFamilyMetadata + _, err := cfm.unmarshal(tagFamilyMetadataData) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error parsing tag family metadata %s for series %d in part %016x: %v\n", tagFamilyName, bm.seriesID, partID, err) + continue + } + + // Read each tag (column) in the tag family + for _, colMeta := range cfm.columns { + fullTagName := tagFamilyName + "." + colMeta.name + tagValues, err := readMeasureTagValues(decoder, colMeta.dataBlock, fullTagName, int(bm.count), p.tagFamilies[tagFamilyName], colMeta.valueType) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error reading tag %s for series %d in part %016x: %v\n", fullTagName, bm.seriesID, partID, err) + continue + } + tags[fullTagName] = tagValues + } + } + return tags +} + +func (ctx *measureDumpContext) readBlockFields(partID uint64, bm *measureBlockMetadata, p *measurePart, decoder *encoding.BytesBlockDecoder) map[string][][]byte { + fields := make(map[string][][]byte) + for _, colMeta := range bm.field.columns { + fieldValues, err := readMeasureFieldValues(decoder, colMeta.dataBlock, colMeta.name, int(bm.count), p.fieldValues, colMeta.valueType) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error reading field %s for series %d in part %016x: %v\n", colMeta.name, bm.seriesID, partID, err) + continue + } + fields[colMeta.name] = fieldValues + } + return fields +} + +func (ctx *measureDumpContext) shouldSkip(tags map[string][]byte) bool { + if ctx.tagFilter == nil || ctx.tagFilter == logical.DummyFilter { + return false + } + // Convert tags to modelv1.Tag format for filtering + modelTags := make([]*modelv1.Tag, 0, len(tags)) + for name, value := range tags { + if value == nil { + continue + } + tagValue := convertMeasureTagValue(value) + if tagValue != nil { + modelTags = append(modelTags, &modelv1.Tag{ + Key: name, + Value: tagValue, + }) + } + } + + // Create a simple registry for tag filtering + registry := &measureTagRegistry{ + tags: tags, + } + + matcher := logical.NewTagFilterMatcher(ctx.tagFilter, registry, measureTagValueDecoder) + match, _ := matcher.Match(modelTags) + return !match +} + +func (ctx *measureDumpContext) writeRow(row measureRowData) error { + if ctx.opts.csvOutput { + if err := writeMeasureRowAsCSV(ctx.writer, row, ctx.fieldColumns, ctx.tagColumns, ctx.seriesMap); err != nil { + return err + } + } else { + writeMeasureRowAsText(row, ctx.rowNum+1, ctx.projectionTags, ctx.seriesMap) + } + ctx.rowNum++ + return nil +} + +func (ctx *measureDumpContext) printSummary() { + if ctx.opts.csvOutput { + fmt.Fprintf(os.Stderr, "Total rows written: %d\n", ctx.rowNum) + return + } + fmt.Printf("\nTotal rows: %d\n", ctx.rowNum) +} + +func openMeasurePart(id uint64, root string, fileSystem fs.FileSystem) (*measurePart, error) { + var p measurePart + partPath := filepath.Join(root, fmt.Sprintf("%016x", id)) + p.path = partPath + p.fileSystem = fileSystem + + // Read metadata.json + metadataPath := filepath.Join(partPath, "metadata.json") + metadata, err := fileSystem.Read(metadataPath) + if err != nil { + return nil, fmt.Errorf("cannot read metadata.json: %w", err) + } + if unmarshalErr := json.Unmarshal(metadata, &p.partMetadata); unmarshalErr != nil { + return nil, fmt.Errorf("cannot parse metadata.json: %w", unmarshalErr) + } + p.partMetadata.ID = id + + // Read primary block metadata + metaPath := filepath.Join(partPath, "meta.bin") + metaFile, err := fileSystem.OpenFile(metaPath) + if err != nil { + return nil, fmt.Errorf("cannot open meta.bin: %w", err) + } + p.primaryBlockMetadata, err = readMeasurePrimaryBlockMetadata(metaFile) + fs.MustClose(metaFile) + if err != nil { + return nil, fmt.Errorf("cannot read primary block metadata: %w", err) + } + + // Open data files + p.primary, err = fileSystem.OpenFile(filepath.Join(partPath, "primary.bin")) + if err != nil { + return nil, fmt.Errorf("cannot open primary.bin: %w", err) + } + + p.timestamps, err = fileSystem.OpenFile(filepath.Join(partPath, "timestamps.bin")) + if err != nil { + fs.MustClose(p.primary) + return nil, fmt.Errorf("cannot open timestamps.bin: %w", err) + } + + p.fieldValues, err = fileSystem.OpenFile(filepath.Join(partPath, "fv.bin")) + if err != nil { + fs.MustClose(p.primary) + fs.MustClose(p.timestamps) + return nil, fmt.Errorf("cannot open fv.bin: %w", err) + } + + // Open tag family files + entries := fileSystem.ReadDir(partPath) + p.tagFamilies = make(map[string]fs.Reader) + p.tagFamilyMetadata = make(map[string]fs.Reader) + for _, e := range entries { + if e.IsDir() { + continue + } + name := e.Name() + if strings.HasSuffix(name, ".tfm") { + tagFamilyName := name[:len(name)-4] + reader, err := fileSystem.OpenFile(filepath.Join(partPath, name)) + if err == nil { + p.tagFamilyMetadata[tagFamilyName] = reader + } + } + if strings.HasSuffix(name, ".tf") { + tagFamilyName := name[:len(name)-3] + reader, err := fileSystem.OpenFile(filepath.Join(partPath, name)) + if err == nil { + p.tagFamilies[tagFamilyName] = reader + } + } + } + + return &p, nil +} + +func closeMeasurePart(p *measurePart) { + if p.primary != nil { + fs.MustClose(p.primary) + } + if p.timestamps != nil { + fs.MustClose(p.timestamps) + } + if p.fieldValues != nil { + fs.MustClose(p.fieldValues) + } + for _, r := range p.tagFamilies { + fs.MustClose(r) + } + for _, r := range p.tagFamilyMetadata { + fs.MustClose(r) + } +} + +func readMeasurePrimaryBlockMetadata(r fs.Reader) ([]measurePrimaryBlockMetadata, error) { + sr := r.SequentialRead() + data, err := io.ReadAll(sr) + if err != nil { + return nil, fmt.Errorf("cannot read: %w", err) + } + fs.MustClose(sr) + + decompressed, err := zstd.Decompress(nil, data) + if err != nil { + return nil, fmt.Errorf("cannot decompress: %w", err) + } + + var result []measurePrimaryBlockMetadata + src := decompressed + for len(src) > 0 { + var pbm measurePrimaryBlockMetadata + src, err = unmarshalMeasurePrimaryBlockMetadata(&pbm, src) + if err != nil { + return nil, err + } + result = append(result, pbm) + } + return result, nil +} + +func unmarshalMeasurePrimaryBlockMetadata(pbm *measurePrimaryBlockMetadata, src []byte) ([]byte, error) { + if len(src) < 40 { + return nil, fmt.Errorf("insufficient data") + } + pbm.seriesID = common.SeriesID(encoding.BytesToUint64(src)) + src = src[8:] + pbm.minTimestamp = int64(encoding.BytesToUint64(src)) + src = src[8:] + pbm.maxTimestamp = int64(encoding.BytesToUint64(src)) + src = src[8:] + pbm.offset = encoding.BytesToUint64(src) + src = src[8:] + pbm.size = encoding.BytesToUint64(src) + return src[8:], nil +} + +func parseMeasureBlockMetadata(src []byte) ([]*measureBlockMetadata, error) { + var result []*measureBlockMetadata + for len(src) > 0 { + bm, tail, err := unmarshalMeasureBlockMetadata(src) + if err != nil { + return nil, err + } + result = append(result, bm) + src = tail + } + return result, nil +} + +func unmarshalMeasureBlockMetadata(src []byte) (*measureBlockMetadata, []byte, error) { + var bm measureBlockMetadata + + if len(src) < 8 { + return nil, nil, fmt.Errorf("cannot unmarshal blockMetadata from less than 8 bytes") + } + bm.seriesID = common.SeriesID(encoding.BytesToUint64(src)) + src = src[8:] + + src, n := encoding.BytesToVarUint64(src) + bm.uncompressedSizeBytes = n + + src, n = encoding.BytesToVarUint64(src) + bm.count = n + + // Unmarshal timestamps metadata + src = bm.timestamps.unmarshal(src) + + // Unmarshal tag families + src, tagFamilyCount := encoding.BytesToVarUint64(src) + if tagFamilyCount > 0 { + bm.tagFamilies = make(map[string]*measureDataBlock) + for i := uint64(0); i < tagFamilyCount; i++ { + var nameBytes []byte + var err error + src, nameBytes, err = encoding.DecodeBytes(src) + if err != nil { + return nil, nil, fmt.Errorf("cannot unmarshal tagFamily name: %w", err) + } + tf := &measureDataBlock{} + src = tf.unmarshal(src) + bm.tagFamilies[string(nameBytes)] = tf + } + } + + // Unmarshal field (column family metadata) + var err error + src, err = bm.field.unmarshal(src) + if err != nil { + return nil, nil, fmt.Errorf("cannot unmarshal columnFamilyMetadata: %w", err) + } + + return &bm, src, nil +} + +func (tm *measureTimestampsMetadata) unmarshal(src []byte) []byte { + src = tm.dataBlock.unmarshal(src) + tm.min = int64(encoding.BytesToUint64(src)) + src = src[8:] + tm.max = int64(encoding.BytesToUint64(src)) + src = src[8:] + tm.encodeType = encoding.EncodeType(src[0]) + src = src[1:] + src, n := encoding.BytesToVarUint64(src) + tm.versionOffset = n + tm.versionFirst = int64(encoding.BytesToUint64(src)) + src = src[8:] + tm.versionEncodeType = encoding.EncodeType(src[0]) + return src[1:] +} + +func (db *measureDataBlock) unmarshal(src []byte) []byte { + src, n := encoding.BytesToVarUint64(src) + db.offset = n + src, n = encoding.BytesToVarUint64(src) + db.size = n + return src +} + +func (cfm *measureColumnFamilyMetadata) unmarshal(src []byte) ([]byte, error) { + src, columnMetadataLen := encoding.BytesToVarUint64(src) + if columnMetadataLen < 1 { + return src, nil + } + cfm.columns = make([]measureColumnMetadata, columnMetadataLen) + var err error + for i := range cfm.columns { + src, err = cfm.columns[i].unmarshal(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal columnMetadata %d: %w", i, err) + } + } + return src, nil +} + +func (cm *measureColumnMetadata) unmarshal(src []byte) ([]byte, error) { + src, nameBytes, err := encoding.DecodeBytes(src) + if err != nil { + return nil, fmt.Errorf("cannot unmarshal columnMetadata.name: %w", err) + } + cm.name = string(nameBytes) + if len(src) < 1 { + return nil, fmt.Errorf("cannot unmarshal columnMetadata.valueType: src is too short") + } + cm.valueType = pbv1.ValueType(src[0]) + src = src[1:] + src = cm.dataBlock.unmarshal(src) + return src, nil +} + +func readMeasureTimestamps(tm measureTimestampsMetadata, count int, reader fs.Reader) ([]int64, []int64, error) { + data := make([]byte, tm.dataBlock.size) + fs.MustReadData(reader, int64(tm.dataBlock.offset), data) + + if tm.dataBlock.size < tm.versionOffset { + return nil, nil, fmt.Errorf("size %d must be greater than versionOffset %d", tm.dataBlock.size, tm.versionOffset) + } + + // Get the common type from the version type (similar to mustDecodeTimestampsWithVersions) + commonEncodeType := encoding.GetCommonType(tm.encodeType) + if commonEncodeType == encoding.EncodeTypeUnknown { + return nil, nil, fmt.Errorf("unexpected encodeType %d", tm.encodeType) + } + + // Decode timestamps (first part of the data) + var timestamps []int64 + var err error + timestamps, err = encoding.BytesToInt64List(timestamps, data[:tm.versionOffset], commonEncodeType, tm.min, count) + if err != nil { + return nil, nil, fmt.Errorf("cannot decode timestamps: %w", err) + } + + // Decode versions (second part of the data, starting from versionOffset) + var versions []int64 + versions, err = encoding.BytesToInt64List(versions, data[tm.versionOffset:], tm.versionEncodeType, tm.versionFirst, count) + if err != nil { + return nil, nil, fmt.Errorf("cannot decode versions: %w", err) + } + + return timestamps, versions, nil +} + +func readMeasureTagValues(decoder *encoding.BytesBlockDecoder, tagBlock measureDataBlock, _ string, count int, + valueReader fs.Reader, valueType pbv1.ValueType, +) ([][]byte, error) { + // Read tag values + bb := &bytes.Buffer{} + bb.Buf = make([]byte, tagBlock.size) + fs.MustReadData(valueReader, int64(tagBlock.offset), bb.Buf) + + // Decode values using the internal encoding package + var err error + var values [][]byte + values, err = internalencoding.DecodeTagValues(values, decoder, bb, valueType, count) + if err != nil { + return nil, fmt.Errorf("cannot decode tag values: %w", err) + } + + return values, nil +} + +func readMeasureFieldValues(decoder *encoding.BytesBlockDecoder, fieldBlock measureDataBlock, _ string, count int, + valueReader fs.Reader, valueType pbv1.ValueType, +) ([][]byte, error) { + // Read field values + bb := &bytes.Buffer{} + bb.Buf = make([]byte, fieldBlock.size) + fs.MustReadData(valueReader, int64(fieldBlock.offset), bb.Buf) + + // Decode values based on value type + var values [][]byte + var err error + + switch valueType { + case pbv1.ValueTypeInt64: + // Decode int64 values - similar to column.decodeInt64Column + if len(bb.Buf) < 1 { + return nil, fmt.Errorf("buffer too short for int64 field") + } + encodeType := encoding.EncodeType(bb.Buf[0]) + if encodeType == encoding.EncodeTypePlain { + // Use default decoder for plain encoding + bb.Buf = bb.Buf[1:] + values, err = decoder.Decode(values[:0], bb.Buf, uint64(count)) + if err != nil { + return nil, fmt.Errorf("cannot decode int64 field values (plain): %w", err) + } + } else { + const expectedLen = 9 + if len(bb.Buf) < expectedLen { + return nil, fmt.Errorf("buffer too short for int64 field: expected at least %d bytes", expectedLen) + } + firstValue := convert.BytesToInt64(bb.Buf[1:9]) + bb.Buf = bb.Buf[9:] + intValues := make([]int64, count) + intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, encodeType, firstValue, count) + if err != nil { + return nil, fmt.Errorf("cannot decode int64 field values: %w", err) + } + values = make([][]byte, count) + for i, v := range intValues { + values[i] = convert.Int64ToBytes(v) + } + } + case pbv1.ValueTypeFloat64: + // Decode float64 values - similar to column.decodeFloat64Column + if len(bb.Buf) < 1 { + return nil, fmt.Errorf("buffer too short for float64 field") + } + encodeType := encoding.EncodeType(bb.Buf[0]) + if encodeType == encoding.EncodeTypePlain { + // Use default decoder for plain encoding + bb.Buf = bb.Buf[1:] + values, err = decoder.Decode(values[:0], bb.Buf, uint64(count)) + if err != nil { + return nil, fmt.Errorf("cannot decode float64 field values (plain): %w", err) + } + } else { + const expectedLen = 11 + if len(bb.Buf) < expectedLen { + return nil, fmt.Errorf("buffer too short for float64 field: expected at least %d bytes", expectedLen) + } + exp := convert.BytesToInt16(bb.Buf[1:3]) + firstValue := convert.BytesToInt64(bb.Buf[3:11]) + bb.Buf = bb.Buf[11:] + intValues := make([]int64, count) + intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, encodeType, firstValue, count) + if err != nil { + return nil, fmt.Errorf("cannot decode int values for float64: %w", err) + } + floatValues := make([]float64, count) + floatValues, err = encoding.DecimalIntListToFloat64List(floatValues[:0], intValues, exp, count) + if err != nil { + return nil, fmt.Errorf("cannot convert DecimalIntList to Float64List: %w", err) + } + values = make([][]byte, count) + for i, v := range floatValues { + values[i] = convert.Float64ToBytes(v) + } + } + default: + // Use default decoder for other types + values, err = internalencoding.DecodeTagValues(values, decoder, bb, valueType, count) + if err != nil { + return nil, fmt.Errorf("cannot decode field values: %w", err) + } + } + + return values, nil +} + +func discoverMeasurePartIDs(shardPath string) ([]uint64, error) { + entries, err := os.ReadDir(shardPath) + if err != nil { + return nil, fmt.Errorf("failed to read shard directory: %w", err) + } + + var partIDs []uint64 + for _, entry := range entries { + if !entry.IsDir() { + continue + } + name := entry.Name() + if name == dirNameSidx || name == dirNameMeta { + continue + } + partID, err := strconv.ParseUint(name, 16, 64) + if err == nil { + partIDs = append(partIDs, partID) + } + } + + sort.Slice(partIDs, func(i, j int) bool { + return partIDs[i] < partIDs[j] + }) + + return partIDs, nil +} + +func loadMeasureSeriesMap(segmentPath string) (map[common.SeriesID]string, error) { + seriesIndexPath := filepath.Join(segmentPath, dirNameSidx) + + l := logger.GetLogger("dump-measure") + + store, err := inverted.NewStore(inverted.StoreOpts{ + Path: seriesIndexPath, + Logger: l, + }) + if err != nil { + return nil, fmt.Errorf("failed to open series index: %w", err) + } + defer store.Close() + + ctx := context.Background() + iter, err := store.SeriesIterator(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create series iterator: %w", err) + } + defer iter.Close() + + seriesMap := make(map[common.SeriesID]string) + for iter.Next() { + series := iter.Val() + if len(series.EntityValues) > 0 { + seriesID := common.SeriesID(convert.Hash(series.EntityValues)) + seriesText := string(series.EntityValues) + seriesMap[seriesID] = seriesText + } + } + + return seriesMap, nil +} + +func parseMeasureCriteriaJSON(criteriaJSON string) (*modelv1.Criteria, error) { + criteria := &modelv1.Criteria{} + err := protojson.Unmarshal([]byte(criteriaJSON), criteria) + if err != nil { + return nil, fmt.Errorf("invalid criteria JSON: %w", err) + } + return criteria, nil +} + +func parseMeasureProjectionTags(projectionStr string) []string { + if projectionStr == "" { + return nil + } + + tags := strings.Split(projectionStr, ",") + result := make([]string, 0, len(tags)) + for _, tag := range tags { + tag = strings.TrimSpace(tag) + if tag != "" { + result = append(result, tag) + } + } + return result +} + +func discoverMeasureColumns(partIDs []uint64, shardPath string, fileSystem fs.FileSystem) ([]string, []string, error) { + if len(partIDs) == 0 { + return nil, nil, nil + } + + p, err := openMeasurePart(partIDs[0], shardPath, fileSystem) + if err != nil { + return nil, nil, fmt.Errorf("failed to open first part: %w", err) + } + defer closeMeasurePart(p) + + tagNames := make(map[string]bool) + fieldNames := make(map[string]bool) + partID := partIDs[0] + for tagFamilyName := range p.tagFamilies { + // Read tag family metadata to get tag names + if tagFamilyMetadataReader, ok := p.tagFamilyMetadata[tagFamilyName]; ok { + metaData, err := io.ReadAll(tagFamilyMetadataReader.SequentialRead()) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error reading tag family metadata %s in part %016x: %v\n", tagFamilyName, partID, err) + continue + } + var cfm measureColumnFamilyMetadata + _, err = cfm.unmarshal(metaData) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Error parsing tag family metadata %s in part %016x: %v\n", tagFamilyName, partID, err) + continue + } + for _, colMeta := range cfm.columns { + fullTagName := tagFamilyName + "." + colMeta.name + tagNames[fullTagName] = true + } + } + } + + // Read primary block to discover field names + if len(p.primaryBlockMetadata) > 0 { + primaryData := make([]byte, p.primaryBlockMetadata[0].size) + fs.MustReadData(p.primary, int64(p.primaryBlockMetadata[0].offset), primaryData) + + decompressed, err := zstd.Decompress(nil, primaryData) + if err == nil { + blockMetadatas, err := parseMeasureBlockMetadata(decompressed) + if err == nil && len(blockMetadatas) > 0 { + for _, colMeta := range blockMetadatas[0].field.columns { + fieldNames[colMeta.name] = true + } + } + } + } + + tagResult := make([]string, 0, len(tagNames)) + for name := range tagNames { + tagResult = append(tagResult, name) + } + sort.Strings(tagResult) + + fieldResult := make([]string, 0, len(fieldNames)) + for name := range fieldNames { + fieldResult = append(fieldResult, name) + } + sort.Strings(fieldResult) + + return tagResult, fieldResult, nil +} + +func writeMeasureRowAsText(row measureRowData, rowNum int, projectionTags []string, seriesMap map[common.SeriesID]string) { + fmt.Printf("Row %d:\n", rowNum) + fmt.Printf(" PartID: %d (0x%016x)\n", row.partID, row.partID) + fmt.Printf(" Timestamp: %s\n", formatTimestamp(row.timestamp)) + fmt.Printf(" Version: %d\n", row.version) + fmt.Printf(" SeriesID: %d\n", row.seriesID) + + if seriesMap != nil { + if seriesText, ok := seriesMap[row.seriesID]; ok { + fmt.Printf(" Series: %s\n", seriesText) + } + } + + if len(row.fields) > 0 { + fmt.Printf(" Fields:\n") + var fieldNames []string + for name := range row.fields { + fieldNames = append(fieldNames, name) + } + sort.Strings(fieldNames) + for _, name := range fieldNames { + value := row.fields[name] + valueType := row.fieldTypes[name] + if value == nil { + fmt.Printf(" %s: <nil>\n", name) + } else { + fmt.Printf(" %s: %s\n", name, formatTagValueForDisplay(value, valueType)) Review Comment: The function `formatTagValueForDisplay` is called but not defined in this file. This will cause a compilation error. The function needs to be either: 1. Defined in this file (can be copied from trace.go), or 2. Extracted to a shared utility file ########## banyand/cmd/dump/measure.go: ########## @@ -0,0 +1,1258 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "encoding/csv" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + + "github.com/spf13/cobra" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" + "github.com/apache/skywalking-banyandb/pkg/bytes" + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" +) + +const ( + dirNameSidx = "sidx" + dirNameMeta = "meta" +) + +type measureDumpOptions struct { + shardPath string + segmentPath string + criteriaJSON string + projectionTags string + verbose bool + csvOutput bool +} + +func newMeasureCmd() *cobra.Command { + var shardPath string + var segmentPath string + var verbose bool + var csvOutput bool + var criteriaJSON string + var projectionTags string + + cmd := &cobra.Command{ + Use: "measure", + Short: "Dump measure shard data", + Long: `Dump and display contents of a measure shard directory (containing multiple parts). +Outputs measure data in human-readable format or CSV. + +Supports filtering by criteria and projecting specific tags.`, + Example: ` # Display measure data from shard in text format + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment + + # Display with verbose hex dumps + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment -v + + # Filter by criteria + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --criteria '{"condition":{"name":"query","op":"BINARY_OP_HAVING","value":{"strArray":{"value":["tag1=value1","tag2=value2"]}}}}' + + # Project specific tags + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --projection "tag1,tag2,tag3" + + # Output as CSV + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv + + # Save CSV to file + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv > output.csv`, + RunE: func(_ *cobra.Command, _ []string) error { + if shardPath == "" { + return fmt.Errorf("--shard-path flag is required") + } + if segmentPath == "" { + return fmt.Errorf("--segment-path flag is required") + } + return dumpMeasureShard(measureDumpOptions{ + shardPath: shardPath, + segmentPath: segmentPath, + verbose: verbose, + csvOutput: csvOutput, + criteriaJSON: criteriaJSON, + projectionTags: projectionTags, + }) + }, + } + + cmd.Flags().StringVar(&shardPath, "shard-path", "", "Path to the shard directory (required)") + cmd.Flags().StringVarP(&segmentPath, "segment-path", "g", "", "Path to the segment directory (required)") + cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output (show raw data)") + cmd.Flags().BoolVar(&csvOutput, "csv", false, "Output as CSV format") + cmd.Flags().StringVarP(&criteriaJSON, "criteria", "c", "", "Criteria filter as JSON string") + cmd.Flags().StringVarP(&projectionTags, "projection", "p", "", "Comma-separated list of tags to include as columns (e.g., tag1,tag2,tag3)") + _ = cmd.MarkFlagRequired("shard-path") + _ = cmd.MarkFlagRequired("segment-path") + + return cmd +} + +func dumpMeasureShard(opts measureDumpOptions) error { + ctx, err := newMeasureDumpContext(opts) + if err != nil || ctx == nil { + return err + } + defer ctx.close() + + if err := ctx.processParts(); err != nil { + return err + } + + ctx.printSummary() + return nil +} + +type measurePartMetadata struct { + CompressedSizeBytes uint64 `json:"compressedSizeBytes"` + UncompressedSizeBytes uint64 `json:"uncompressedSizeBytes"` + TotalCount uint64 `json:"totalCount"` + BlocksCount uint64 `json:"blocksCount"` + MinTimestamp int64 `json:"minTimestamp"` + MaxTimestamp int64 `json:"maxTimestamp"` + ID uint64 `json:"-"` +} + +type measurePrimaryBlockMetadata struct { + seriesID common.SeriesID + minTimestamp int64 + maxTimestamp int64 + offset uint64 + size uint64 +} + +type measureDataBlock struct { + offset uint64 + size uint64 +} + +type measureBlockMetadata struct { + tagFamilies map[string]*measureDataBlock + field measureColumnFamilyMetadata + timestamps measureTimestampsMetadata + seriesID common.SeriesID + uncompressedSizeBytes uint64 + count uint64 +} + +type measureTimestampsMetadata struct { + dataBlock measureDataBlock + min int64 + max int64 + versionOffset uint64 + versionFirst int64 + encodeType encoding.EncodeType + versionEncodeType encoding.EncodeType +} + +type measureColumnFamilyMetadata struct { + columns []measureColumnMetadata +} + +type measureColumnMetadata struct { + name string + dataBlock measureDataBlock + valueType pbv1.ValueType +} + +type measurePart struct { + primary fs.Reader + timestamps fs.Reader + fieldValues fs.Reader + fileSystem fs.FileSystem + tagFamilyMetadata map[string]fs.Reader + tagFamilies map[string]fs.Reader + path string + primaryBlockMetadata []measurePrimaryBlockMetadata + partMetadata measurePartMetadata +} + +type measureRowData struct { + tags map[string][]byte + fields map[string][]byte + fieldTypes map[string]pbv1.ValueType + timestamp int64 + version int64 + partID uint64 + seriesID common.SeriesID +} + +type measureDumpContext struct { + tagFilter logical.TagFilter + fileSystem fs.FileSystem + seriesMap map[common.SeriesID]string + writer *csv.Writer + opts measureDumpOptions + partIDs []uint64 + projectionTags []string + tagColumns []string + fieldColumns []string + rowNum int +} + +func newMeasureDumpContext(opts measureDumpOptions) (*measureDumpContext, error) { + ctx := &measureDumpContext{ + opts: opts, + fileSystem: fs.NewLocalFileSystem(), + } + + partIDs, err := discoverMeasurePartIDs(opts.shardPath) + if err != nil { + return nil, fmt.Errorf("failed to discover part IDs: %w", err) + } + if len(partIDs) == 0 { + fmt.Println("No parts found in shard directory") + return nil, nil + } + ctx.partIDs = partIDs + fmt.Fprintf(os.Stderr, "Found %d parts in shard\n", len(partIDs)) + + ctx.seriesMap, err = loadMeasureSeriesMap(opts.segmentPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Failed to load series information: %v\n", err) + ctx.seriesMap = nil + } else { + fmt.Fprintf(os.Stderr, "Loaded %d series from segment\n", len(ctx.seriesMap)) + } + + if opts.criteriaJSON != "" { + var criteria *modelv1.Criteria + criteria, err = parseMeasureCriteriaJSON(opts.criteriaJSON) + if err != nil { + return nil, fmt.Errorf("failed to parse criteria: %w", err) + } + ctx.tagFilter, err = logical.BuildSimpleTagFilter(criteria) + if err != nil { + return nil, fmt.Errorf("failed to build tag filter: %w", err) + } + fmt.Fprintf(os.Stderr, "Applied criteria filter\n") + } + + if opts.projectionTags != "" { + ctx.projectionTags = parseMeasureProjectionTags(opts.projectionTags) + fmt.Fprintf(os.Stderr, "Projection tags: %v\n", ctx.projectionTags) + } + + if opts.csvOutput { + if len(ctx.projectionTags) > 0 { + ctx.tagColumns = ctx.projectionTags + } else { + ctx.tagColumns, ctx.fieldColumns, err = discoverMeasureColumns(ctx.partIDs, opts.shardPath, ctx.fileSystem) + if err != nil { + return nil, fmt.Errorf("failed to discover columns: %w", err) + } Review Comment: Bug: When projectionTags is provided, `fieldColumns` is not populated but is still used in line 300 when creating CSV headers. This will result in missing field columns in the CSV output. The fix should ensure fieldColumns is always discovered for CSV output: ```go if opts.csvOutput { ctx.tagColumns, ctx.fieldColumns, err = discoverMeasureColumns(ctx.partIDs, opts.shardPath, ctx.fileSystem) if err != nil { return nil, fmt.Errorf("failed to discover columns: %w", err) } if len(ctx.projectionTags) > 0 { ctx.tagColumns = ctx.projectionTags } } ``` ```suggestion ctx.tagColumns, ctx.fieldColumns, err = discoverMeasureColumns(ctx.partIDs, opts.shardPath, ctx.fileSystem) if err != nil { return nil, fmt.Errorf("failed to discover columns: %w", err) } if len(ctx.projectionTags) > 0 { ctx.tagColumns = ctx.projectionTags ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
