hanahmily commented on code in PR #864: URL: https://github.com/apache/skywalking-banyandb/pull/864#discussion_r2563118402
########## banyand/cmd/dump/measure.go: ########## @@ -0,0 +1,1258 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "encoding/csv" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + + "github.com/spf13/cobra" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" + "github.com/apache/skywalking-banyandb/pkg/bytes" + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" +) + +const ( + dirNameSidx = "sidx" + dirNameMeta = "meta" +) + +type measureDumpOptions struct { + shardPath string + segmentPath string + criteriaJSON string + projectionTags string + verbose bool + csvOutput bool +} + +func newMeasureCmd() *cobra.Command { + var shardPath string + var segmentPath string + var verbose bool + var csvOutput bool + var criteriaJSON string + var projectionTags string + + cmd := &cobra.Command{ + Use: "measure", + Short: "Dump measure shard data", + Long: `Dump and display contents of a measure shard directory (containing multiple parts). +Outputs measure data in human-readable format or CSV. + +Supports filtering by criteria and projecting specific tags.`, + Example: ` # Display measure data from shard in text format + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment + + # Display with verbose hex dumps + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment -v + + # Filter by criteria + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --criteria '{"condition":{"name":"query","op":"BINARY_OP_HAVING","value":{"strArray":{"value":["tag1=value1","tag2=value2"]}}}}' + + # Project specific tags + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --projection "tag1,tag2,tag3" + + # Output as CSV + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv + + # Save CSV to file + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv > output.csv`, + RunE: func(_ *cobra.Command, _ []string) error { + if shardPath == "" { + return fmt.Errorf("--shard-path flag is required") + } + if segmentPath == "" { + return fmt.Errorf("--segment-path flag is required") + } + return dumpMeasureShard(measureDumpOptions{ + shardPath: shardPath, + segmentPath: segmentPath, + verbose: verbose, + csvOutput: csvOutput, + criteriaJSON: criteriaJSON, + projectionTags: projectionTags, + }) + }, + } + + cmd.Flags().StringVar(&shardPath, "shard-path", "", "Path to the shard directory (required)") + cmd.Flags().StringVarP(&segmentPath, "segment-path", "g", "", "Path to the segment directory (required)") + cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output (show raw data)") + cmd.Flags().BoolVar(&csvOutput, "csv", false, "Output as CSV format") + cmd.Flags().StringVarP(&criteriaJSON, "criteria", "c", "", "Criteria filter as JSON string") + cmd.Flags().StringVarP(&projectionTags, "projection", "p", "", "Comma-separated list of tags to include as columns (e.g., tag1,tag2,tag3)") + _ = cmd.MarkFlagRequired("shard-path") + _ = cmd.MarkFlagRequired("segment-path") + + return cmd +} + +func dumpMeasureShard(opts measureDumpOptions) error { + ctx, err := newMeasureDumpContext(opts) + if err != nil || ctx == nil { + return err + } + defer ctx.close() + + if err := ctx.processParts(); err != nil { + return err + } + + ctx.printSummary() + return nil +} + +type measurePartMetadata struct { + CompressedSizeBytes uint64 `json:"compressedSizeBytes"` + UncompressedSizeBytes uint64 `json:"uncompressedSizeBytes"` + TotalCount uint64 `json:"totalCount"` + BlocksCount uint64 `json:"blocksCount"` + MinTimestamp int64 `json:"minTimestamp"` + MaxTimestamp int64 `json:"maxTimestamp"` + ID uint64 `json:"-"` +} + +type measurePrimaryBlockMetadata struct { + seriesID common.SeriesID + minTimestamp int64 + maxTimestamp int64 + offset uint64 + size uint64 +} + +type measureDataBlock struct { + offset uint64 + size uint64 +} + +type measureBlockMetadata struct { + tagFamilies map[string]*measureDataBlock + field measureColumnFamilyMetadata + timestamps measureTimestampsMetadata + seriesID common.SeriesID + uncompressedSizeBytes uint64 + count uint64 +} + +type measureTimestampsMetadata struct { + dataBlock measureDataBlock + min int64 + max int64 + versionOffset uint64 + versionFirst int64 + encodeType encoding.EncodeType + versionEncodeType encoding.EncodeType +} + +type measureColumnFamilyMetadata struct { + columns []measureColumnMetadata +} + +type measureColumnMetadata struct { + name string + dataBlock measureDataBlock + valueType pbv1.ValueType +} + +type measurePart struct { + primary fs.Reader + timestamps fs.Reader + fieldValues fs.Reader + fileSystem fs.FileSystem + tagFamilyMetadata map[string]fs.Reader + tagFamilies map[string]fs.Reader + path string + primaryBlockMetadata []measurePrimaryBlockMetadata + partMetadata measurePartMetadata +} + +type measureRowData struct { + tags map[string][]byte + fields map[string][]byte + fieldTypes map[string]pbv1.ValueType + timestamp int64 + version int64 + partID uint64 + seriesID common.SeriesID +} + +type measureDumpContext struct { + tagFilter logical.TagFilter + fileSystem fs.FileSystem + seriesMap map[common.SeriesID]string + writer *csv.Writer + opts measureDumpOptions + partIDs []uint64 + projectionTags []string + tagColumns []string + fieldColumns []string + rowNum int +} + +func newMeasureDumpContext(opts measureDumpOptions) (*measureDumpContext, error) { + ctx := &measureDumpContext{ + opts: opts, + fileSystem: fs.NewLocalFileSystem(), + } + + partIDs, err := discoverMeasurePartIDs(opts.shardPath) + if err != nil { + return nil, fmt.Errorf("failed to discover part IDs: %w", err) + } + if len(partIDs) == 0 { + fmt.Println("No parts found in shard directory") + return nil, nil + } + ctx.partIDs = partIDs + fmt.Fprintf(os.Stderr, "Found %d parts in shard\n", len(partIDs)) + + ctx.seriesMap, err = loadMeasureSeriesMap(opts.segmentPath) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Failed to load series information: %v\n", err) + ctx.seriesMap = nil + } else { + fmt.Fprintf(os.Stderr, "Loaded %d series from segment\n", len(ctx.seriesMap)) + } + + if opts.criteriaJSON != "" { + var criteria *modelv1.Criteria + criteria, err = parseMeasureCriteriaJSON(opts.criteriaJSON) + if err != nil { + return nil, fmt.Errorf("failed to parse criteria: %w", err) + } + ctx.tagFilter, err = logical.BuildSimpleTagFilter(criteria) + if err != nil { + return nil, fmt.Errorf("failed to build tag filter: %w", err) + } + fmt.Fprintf(os.Stderr, "Applied criteria filter\n") + } + + if opts.projectionTags != "" { + ctx.projectionTags = parseMeasureProjectionTags(opts.projectionTags) + fmt.Fprintf(os.Stderr, "Projection tags: %v\n", ctx.projectionTags) + } + + if opts.csvOutput { + if len(ctx.projectionTags) > 0 { + ctx.tagColumns = ctx.projectionTags + } else { + ctx.tagColumns, ctx.fieldColumns, err = discoverMeasureColumns(ctx.partIDs, opts.shardPath, ctx.fileSystem) + if err != nil { + return nil, fmt.Errorf("failed to discover columns: %w", err) + } Review Comment: It's a bug. please fix it. ########## banyand/cmd/dump/measure.go: ########## @@ -0,0 +1,1258 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "encoding/csv" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + + "github.com/spf13/cobra" + "google.golang.org/protobuf/encoding/protojson" + + "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + internalencoding "github.com/apache/skywalking-banyandb/banyand/internal/encoding" + "github.com/apache/skywalking-banyandb/pkg/bytes" + "github.com/apache/skywalking-banyandb/pkg/compress/zstd" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/encoding" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" +) + +const ( + dirNameSidx = "sidx" + dirNameMeta = "meta" +) + +type measureDumpOptions struct { + shardPath string + segmentPath string + criteriaJSON string + projectionTags string + verbose bool + csvOutput bool +} + +func newMeasureCmd() *cobra.Command { + var shardPath string + var segmentPath string + var verbose bool + var csvOutput bool + var criteriaJSON string + var projectionTags string + + cmd := &cobra.Command{ + Use: "measure", + Short: "Dump measure shard data", + Long: `Dump and display contents of a measure shard directory (containing multiple parts). +Outputs measure data in human-readable format or CSV. + +Supports filtering by criteria and projecting specific tags.`, + Example: ` # Display measure data from shard in text format + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment + + # Display with verbose hex dumps + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment -v + + # Filter by criteria + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --criteria '{"condition":{"name":"query","op":"BINARY_OP_HAVING","value":{"strArray":{"value":["tag1=value1","tag2=value2"]}}}}' + + # Project specific tags + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment \ + --projection "tag1,tag2,tag3" + + # Output as CSV + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv + + # Save CSV to file + dump measure --shard-path /path/to/shard-0 --segment-path /path/to/segment --csv > output.csv`, + RunE: func(_ *cobra.Command, _ []string) error { + if shardPath == "" { + return fmt.Errorf("--shard-path flag is required") + } + if segmentPath == "" { + return fmt.Errorf("--segment-path flag is required") + } + return dumpMeasureShard(measureDumpOptions{ + shardPath: shardPath, + segmentPath: segmentPath, + verbose: verbose, + csvOutput: csvOutput, + criteriaJSON: criteriaJSON, + projectionTags: projectionTags, + }) + }, + } + + cmd.Flags().StringVar(&shardPath, "shard-path", "", "Path to the shard directory (required)") + cmd.Flags().StringVarP(&segmentPath, "segment-path", "g", "", "Path to the segment directory (required)") + cmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output (show raw data)") + cmd.Flags().BoolVar(&csvOutput, "csv", false, "Output as CSV format") + cmd.Flags().StringVarP(&criteriaJSON, "criteria", "c", "", "Criteria filter as JSON string") + cmd.Flags().StringVarP(&projectionTags, "projection", "p", "", "Comma-separated list of tags to include as columns (e.g., tag1,tag2,tag3)") Review Comment: There should be two independent projection flags: one for the `tag` and another for the `field`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
