Copilot commented on code in PR #1138: URL: https://github.com/apache/skywalking-banyandb/pull/1138#discussion_r3278031504
########## banyand/measure/migration_schema.go: ########## @@ -0,0 +1,420 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/blugelabs/bluge" + "github.com/pkg/errors" + "google.golang.org/protobuf/encoding/protojson" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" + backupsnapshot "github.com/apache/skywalking-banyandb/banyand/backup/snapshot" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" +) + +// errSchemaPropertyMissing signals "no schema-property/_schema under +// backupDir". Returned by findSchemaPropertyRoot when a caller asks for +// a backup-tree discovery but the snapshot omitted the catalog. +var errSchemaPropertyMissing = errors.New("schema-property catalog not found in backup") + +// schemaPropDoc is one decoded doc emitted by walkSchemaPropertyShard; +// the caller decides whether the kind / group matches what it wants. +type schemaPropDoc struct { + propID string + kindName string // "measure" / "group" / "stream" / ... + group string // empty for non-measure docs + sourceJSON string // embedded protobuf JSON ready for kind-specific Unmarshal + modRev int64 + deleted bool +} + +// walkSchemaPropertyShard opens one shard of the backup's +// schema-property bluge index and invokes visit() for each doc. +// Visitors filter on kindName and fold into their own dedup tables. +func walkSchemaPropertyShard(shardPath string, visit func(schemaPropDoc) error) error { + reader, err := bluge.OpenReader(bluge.DefaultConfig(shardPath)) + if err != nil { + return fmt.Errorf("open bluge reader: %w", err) + } + defer func() { _ = reader.Close() }() + dmi, err := reader.Search(context.Background(), + bluge.NewTopNSearch(schemaSearchSize, bluge.NewMatchAllQuery())) + if err != nil { + return fmt.Errorf("search schema docs: %w", err) + } + for { + next, err := dmi.Next() + if err != nil { + return fmt.Errorf("iterate schema docs: %w", err) + } + if next == nil { + return nil + } + var sourceBytes []byte + var deleted bool + if err := next.VisitStoredFields(func(field string, value []byte) bool { + switch field { + case schemaSourceField: + sourceBytes = append([]byte(nil), value...) + case schemaDeletedTag: + if len(value) > 0 { + deleted = true + } + } + return true + }); err != nil { + return fmt.Errorf("visit schema doc: %w", err) + } + if len(sourceBytes) == 0 { + continue + } + var prop propertyv1.Property + if err := protojson.Unmarshal(sourceBytes, &prop); err != nil { + continue + } + var group, srcJSON string + for _, t := range prop.GetTags() { + sv := t.GetValue().GetStr() + if sv == nil { + continue + } + switch t.GetKey() { + case "group": + group = sv.GetValue() + case "source": + srcJSON = sv.GetValue() + } + } + if vErr := visit(schemaPropDoc{ + propID: prop.GetId(), + kindName: prop.GetMetadata().GetName(), + group: group, + sourceJSON: srcJSON, + modRev: prop.GetMetadata().GetModRevision(), + deleted: deleted, + }); vErr != nil { + return vErr + } + } +} + +const ( + schemaSourceField = "_source" + schemaDeletedTag = "_deleted" + schemaShardPrefix = "shard-" + // schemaSearchSize sets the per-page document budget for the bluge + // scan; the schema-property index in production tops out at ~10k docs so + // a single page is plenty. + schemaSearchSize = 200000 +) + +// resolveSchemaRoot picks the `_schema` bluge directory to read schemas +// from. If schemaPropertyPath is non-empty it is honored directly (used +// by callers that mount a live cluster's schema-property PVC and want +// to skip the backup-style `<node>/<date>/` discovery). Otherwise the +// caller falls back to walking the backup snapshot via +// findSchemaPropertyRoot. +func resolveSchemaRoot(backupDir, schemaPropertyPath string) (string, error) { + if schemaPropertyPath != "" { + return schemaPropertyPath, nil + } + return findSchemaPropertyRoot(backupDir) +} + +// loadMeasureSchemasFromSchemaCatalog reads every measure schema under the +// given groups directly from the schema-property bluge index. Returns +// (group -> measure-name -> schema). When schemaPropertyPath is set, the +// catalog is read straight from that bluge dir (the live PVC mount path); +// otherwise the function walks the backup-style `<node>/<date>/` layout +// rooted at backupDir. +// +// The bluge index retains historical revisions across segments (a schema +// updated N times leaves N docs sharing the same propID with distinct +// mod_revisions). This loader mirrors the live cluster's +// SchemaRegistry.listSchemas dedup: propID is the dedup key, the entry +// with the highest mod_revision wins, and the winning entry is dropped +// entirely if it carries a non-zero _deleted marker (the schema was +// tombstoned at that revision). +func loadMeasureSchemasFromSchemaCatalog(backupDir, schemaPropertyPath string, groups []string) (map[string]map[string]*measureSchemaInfo, error) { + byGroup, err := fetchMeasureSchemasFromSchema(backupDir, schemaPropertyPath, groups) + if err != nil { + return nil, err + } + out := make(map[string]map[string]*measureSchemaInfo, len(groups)) + for _, group := range groups { + list := byGroup[group] + byName := make(map[string]*measureSchemaInfo, len(list)) + for _, s := range list { + byName[s.Name] = s + } + out[group] = byName + } + return out, nil +} + +func fetchMeasureSchemasFromSchema(backupDir, schemaPropertyPath string, groups []string) (map[string][]*measureSchemaInfo, error) { + if backupDir == "" && schemaPropertyPath == "" { + return nil, errors.New("either backup-dir or schema-property-path is required") + } + wanted := make(map[string]bool, len(groups)) + for _, g := range groups { + wanted[g] = true + } + schemaRoot, err := resolveSchemaRoot(backupDir, schemaPropertyPath) + if err != nil { + return nil, err + } + shards, err := os.ReadDir(schemaRoot) + if err != nil { + return nil, fmt.Errorf("read schema-property root %q: %w", schemaRoot, err) + } + candidates := make(map[string]*schemaCandidate) + for _, sh := range shards { + if !sh.IsDir() || !strings.HasPrefix(sh.Name(), schemaShardPrefix) { + continue + } + shardPath := filepath.Join(schemaRoot, sh.Name()) + if loadErr := loadMeasureSchemasFromShard(shardPath, wanted, candidates); loadErr != nil { + return nil, fmt.Errorf("load schemas from %s: %w", shardPath, loadErr) + } + } + result := make(map[string][]*measureSchemaInfo) + for _, c := range candidates { + if c.deleted || c.info == nil { + continue + } + result[c.info.Group] = append(result[c.info.Group], c.info) + } + return result, nil +} + +// schemaCandidate captures the best (highest mod_revision) doc seen for +// one propID while iterating the schema-property bluge index. deleted is +// true when the winning revision is a tombstone, in which case the entry +// must be skipped entirely (an older live revision must not resurrect a +// deleted schema). +type schemaCandidate struct { + info *measureSchemaInfo + modRev int64 + deleted bool +} + +// findSchemaPropertyRoot walks <backup>/<node>/<date>/ and returns the first +// schema-property/_schema directory it finds. Hot/schema-server pods are the +// only ones whose backup carries this catalog. +func findSchemaPropertyRoot(backupDir string) (string, error) { + nodes, err := os.ReadDir(backupDir) + if err != nil { + return "", fmt.Errorf("read backup dir %q: %w", backupDir, err) + } + for _, node := range nodes { + if !node.IsDir() { + continue + } + nodeRoot := filepath.Join(backupDir, node.Name()) + dates, readErr := os.ReadDir(nodeRoot) + if readErr != nil { + continue + } + for _, date := range dates { + if !date.IsDir() { + continue + } + cand := filepath.Join(nodeRoot, date.Name(), backupsnapshot.SchemaPropertyCatalogName, schema.SchemaGroup) + if info, statErr := os.Stat(cand); statErr == nil && info.IsDir() { + return cand, nil + } + } + } + return "", errors.WithMessagef(errSchemaPropertyMissing, + "no %s/%s directory found under %q (only hot/schema-server node backups carry schemas)", + backupsnapshot.SchemaPropertyCatalogName, schema.SchemaGroup, backupDir) +} Review Comment: findSchemaPropertyRoot ignores the requested backup snapshot date (cfg.Date) and returns the first `<backup>/<node>/<date>/schema-property/_schema` it encounters. If the backup root contains multiple dates, this can load schemas/ResourceOpts from a different snapshot than the measure data being migrated, producing wrong SegmentInterval/stage resolution. Consider threading the selected date into schema discovery (or requiring an explicit SchemaPropertyPath in backup mode) so the schema-property catalog is guaranteed to come from the same snapshot date as the migrated measure data. ########## banyand/measure/migration_verify.go: ########## @@ -0,0 +1,405 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + "github.com/blugelabs/bluge" + + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/pkg/fs" +) + +// SegmentReport summarizes one `seg-*` directory under a measure +// group root. Populated by EnumerateGroupTarget for the migration +// verify CLI. +// +// Aligned is true only when ALL of these hold: +// - the dir name parses cleanly into a start time; +// - start = IntervalRule.Standard(start) (start is on the grid); +// - <seg>/metadata exists, is well-formed JSON, and carries a non-empty endTime; +// - end = IntervalRule.NextTime(start) (the segment spans exactly one bucket); +// - the inclusive last instant (end - 1ns) standardizes back to start (start +// and end fall in the same IntervalRule bucket). +type SegmentReport struct { + StartTime time.Time + EndTime time.Time + Seg string + Rows uint64 + SidxDocCount uint64 + Shards int + Parts int + Aligned bool + SidxOpened bool +} + +// EntryGroupReport aggregates source row count + target per-seg report +// for one (entry, group) pair. Both source and target are read-only; +// the verify command never mutates the dataset. +type EntryGroupReport struct { + Group string + EntryStage string + EntryTarget string + TargetGroup string + EntryNodes []string + SrcRoots []string + TargetSegs []SegmentReport + SrcRows uint64 + SrcParts int +} + +// VerifyShardParts reads <shardDir>'s newest `.snp`, confirms every +// listed partID has an on-disk dir, opens each part, and returns the +// sum of partMetadata.TotalCount plus the part count. +// +// Errors (instead of t.Fatalf) so this helper is usable from a CLI. +func VerifyShardParts(shardDir string, fileSystem fs.FileSystem) (uint64, int, error) { + entries, err := os.ReadDir(shardDir) + if err != nil { + return 0, 0, fmt.Errorf("read shard: %w", err) + } + var snpPath string + for _, e := range entries { + if !e.IsDir() && strings.HasSuffix(e.Name(), directCopySnpSuffix) { + candidate := filepath.Join(shardDir, e.Name()) + if snpPath == "" || candidate > snpPath { + snpPath = candidate + } + } + } + if snpPath == "" { + return 0, 0, fmt.Errorf("no .snp file under %s", shardDir) + } + snpRaw, err := os.ReadFile(snpPath) + if err != nil { + return 0, 0, fmt.Errorf("read .snp: %w", err) + } + var partNames []string + if err := json.Unmarshal(snpRaw, &partNames); err != nil { + return 0, 0, fmt.Errorf("parse .snp: %w", err) + } + + onDiskPartIDs := map[string]bool{} + for _, e := range entries { + if e.IsDir() && directCopyPartDirPattern.MatchString(e.Name()) { + onDiskPartIDs[e.Name()] = true + } + } + for _, name := range partNames { + if !onDiskPartIDs[name] { + return 0, 0, fmt.Errorf("snp references missing partID %s", name) + } + } + + var rowsTotal uint64 + for _, name := range partNames { + partID, parseErr := strconv.ParseUint(name, 16, 64) + if parseErr != nil { + return 0, 0, fmt.Errorf("parse partID %q: %w", name, parseErr) + } + var rows uint64 + var openErr error + func() { + defer func() { + if r := recover(); r != nil { + openErr = fmt.Errorf("open part %s panicked: %v", name, r) + } + }() + p := mustOpenFilePart(partID, shardDir, fileSystem) + defer p.close() + rows = p.partMetadata.TotalCount + }() + if openErr != nil { + return 0, 0, openErr + } + rowsTotal += rows + } + return rowsTotal, len(partNames), nil +} + +// CountBlugeDocs opens the bluge index at path read-only and returns the +// total document count. Used to spot-check the broadcast union sidx +// that `migration copy` writes into every aligned target segment. +func CountBlugeDocs(path string) (uint64, error) { + reader, err := bluge.OpenReader(bluge.DefaultConfig(path)) + if err != nil { + return 0, err + } + defer func() { _ = reader.Close() }() + dmi, err := reader.Search(context.Background(), + bluge.NewAllMatches(bluge.NewMatchAllQuery())) + if err != nil { + return 0, err + } + var count uint64 + for { + next, nextErr := dmi.Next() + if nextErr != nil { + return count, nextErr + } + if next == nil { + return count, nil + } + count++ + } Review Comment: CountBlugeDocs counts documents by iterating every match via dmi.Next(), which makes `verify` O(numSegments × numDocsInSidx). On real datasets where the union sidx is large and broadcast into many segments, this can dominate runtime. Consider using a constant-time index statistic API (e.g. a reader doc-count method) or otherwise avoiding a full scan per segment (such as counting once per distinct sidx path and reusing the result). ########## banyand/cmd/migration/copy.go: ########## @@ -0,0 +1,194 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "runtime" + "runtime/debug" + "strconv" + "syscall" + "time" + + "github.com/spf13/cobra" + + "github.com/apache/skywalking-banyandb/banyand/measure" + "github.com/apache/skywalking-banyandb/pkg/cgroups" +) + +func newCopyCmd() *cobra.Command { + var configPath string + + cmd := &cobra.Command{ + Use: "copy", + Short: "Direct-file copy of measure parts into local measure root with row-level grid alignment", + Long: `copy walks every source part, reads each row's actual timestamp, and routes +the row to the grid-aligned target segment via SegmentInterval.Standard. +A single source part can fan out to multiple target segments (one new +part each). + +The run is fully driven by a YAML config (--copy-config). One config +carries one source (backup snapshot or live PVC mounts), one group list, +and one or more (stage, target) entries — schemas and per-group union +sidx are loaded/built once and reused across every entry. + +Per group, a single union sidx is built in the staging directory by +deduplicating every source seg-*/sidx doc by SeriesID; that union sidx +is then broadcast (byte-copied) into every aligned target segment that +received any rows. A fresh .snp is written per (target segment, shard) +so tsTable.loadSnapshot picks up the new parts on next startup. + +Schemas (measure tag families + IndexMode bit) and group resource +opts (SegmentInterval per LifecycleStage) are read directly from the +source's schema-property bluge catalog — no liaison access is needed. +Groups containing any IndexMode measure are rejected up front, since +their fields live inside sidx and the broadcast strategy would corrupt +cross-node deduplication at the liaison. + +The staging directory is removed in full when the run finishes +(success or failure). Run inside a data pod whose BanyanDB process is +NOT writing to any of the entry targets. Source and target shard +topology must match.`, + RunE: func(_ *cobra.Command, _ []string) error { + applyGoMemLimit() + + plan, err := LoadCopyPlan(configPath) + if err != nil { + return err + } + stagingDir, err := PrepareStagingDir(plan.StagingDir) + if err != nil { + return err + } + defer func() { + if rmErr := os.RemoveAll(stagingDir); rmErr != nil { + fmt.Fprintf(os.Stderr, + "warning: failed to clean staging dir %s: %v\n", + stagingDir, rmErr) + } + }() + + cfg := plan.ToDirectCopyConfig(stagingDir) + + ctx, stop := signal.NotifyContext(context.Background(), + os.Interrupt, syscall.SIGTERM) + defer stop() + + res, runErr := measure.MigrationCopy(ctx, cfg) + printCopyResult(res) + return runErr + }, + } + + cmd.Flags().StringVar(&configPath, "copy-config", "", + "path to the YAML migration copy plan (required)") + _ = cmd.MarkFlagRequired("copy-config") + return cmd +} + +func printCopyResult(res measure.DirectCopyResult) { + fmt.Println("DONE in", res.Duration) + fmt.Printf(" target segments : %d\n", res.Segments) + fmt.Printf(" source parts : %d\n", res.SourceParts) + fmt.Printf(" target mem-parts : %d (pre-merge; banyandb's merge loop will compact)\n", res.TargetParts) + fmt.Printf(" rows copied : %d\n", res.Rows) + fmt.Printf(" bytes written : %d\n", res.Bytes) + fmt.Printf(" fast-path parts : %d\n", measure.FastPathHits()) + fmt.Printf(" slow-path parts : %d\n", measure.SlowPathHits()) + fmt.Printf(" slow-path rows : %d\n", measure.SlowPathRows()) +} + +// applyGoMemLimit auto-sets the Go runtime's heap soft-cap (GOMEMLIMIT) +// from the process cgroup memory limit when the operator did not pass +// one explicitly. +// +// Why this is necessary: +// - Go's runtime sizes the heap from live set + GOGC by default; it +// does NOT read cgroup limits on its own. On a memory-constrained +// pod a momentary heap spike during slow-path zstd flush can sail +// right past the pod limit and trip the kernel OOMKiller (or +// macOS jetsam on kind), wiping the whole migration mid-run. +// - Historical evidence from this codebase: an unconstrained run on +// a 16 GiB GKE pod OOM'd at 99 % through the warm tier; raising +// the limit to 20 GiB AND setting GOMEMLIMIT to 17 GiB (85 %) was +// the fix. The shared flush-pool + Phase A union-sidx refactor +// reduced peak memory, but no run has been validated WITHOUT +// GOMEMLIMIT post-refactor, so we still ship the safety net. +// +// Why we don't rely on operators setting GOMEMLIMIT manually: +// - Go 1.19+ honors a GOMEMLIMIT env at startup, so an operator who +// remembered to set it gets the same effect. This helper is a UX +// fallback for the common case where it was forgotten — we read +// the cgroup limit (works on kind, GKE, plain docker run) and +// apply 85 % of it. The 15 % headroom is for the runtime's own +// overhead + the Linux page cache. +// +// Resolution order: GOMEMLIMIT env wins (we just log and return); +// otherwise read cgroup limit and call debug.SetMemoryLimit; otherwise +// log "not set" and let the process run uncapped (bare-host case where +// the operator presumably has enough RAM). +func applyGoMemLimit() { + if raw := os.Getenv("GOMEMLIMIT"); raw != "" { + // Honor whatever the env already set — Go's runtime parsed it + // at startup. Just log. + fmt.Fprintf(os.Stdout, "[migration] GOMEMLIMIT honored from env: %s\n", raw) + return + } + limit, err := cgroups.MemoryLimit() + if err != nil || limit <= 0 { + // Fall back to 85% of host total memory. cgroups.MemoryLimit + // returns -1 for "max" (unlimited) on a non-containerised host. + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + if ms.Sys == 0 { + fmt.Fprintf(os.Stdout, "[migration] GOMEMLIMIT not set (no cgroup limit, runtime.MemStats unavailable)\n") + return + } + // MemStats.Sys reports OS memory reserved for the runtime, not + // host RAM. As a heuristic we don't set a limit here. + fmt.Fprintf(os.Stdout, "[migration] GOMEMLIMIT not set (cgroup limit unavailable; running on bare host)\n") + return Review Comment: The comment says "Fall back to 85% of host total memory" when cgroups.MemoryLimit is unavailable, but the implementation intentionally does not set any limit in that branch (it logs and returns). Please update the comment to match the actual behavior (or implement the described fallback) to avoid misleading operators about how GOMEMLIMIT is chosen on non-cgroup/unlimited environments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
