Copilot commented on code in PR #1180: URL: https://github.com/apache/skywalking-banyandb/pull/1180#discussion_r3421667055
########## test/cases/lifecycle/orphan.go: ########## @@ -0,0 +1,495 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle_test + +import ( + "bytes" + "compress/gzip" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/timestamppb" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + "github.com/apache/skywalking-banyandb/banyand/backup/lifecycle" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/test/flags" +) + +// orphanRec mirrors one archived JSONL line (the subset this suite asserts on). +// The archive writer lives in banyand/backup/lifecycle (different package); this +// is a read-side view of its self-describing record. +type orphanRec struct { + Fields map[string]orphanVal `json:"fields"` + Tags map[string]orphanVal `json:"tags"` + Group string `json:"group"` + Catalog string `json:"catalog"` + Measure string `json:"measure"` + Timestamp string `json:"timestamp"` + ElementID string `json:"element_id"` + Entity []string `json:"entity"` + TimeNanos int64 `json:"timestamp_unix_nano"` + Version int64 `json:"version"` +} + +type orphanVal struct { + Value interface{} `json:"value"` + Type string `json:"type"` +} + +type orphanManifest struct { + Measures []struct { + Measure string `json:"measure"` + Rows int `json:"rows"` + } `json:"measures"` + TotalRows int `json:"total_rows"` +} + +// dayInterval/coprime grid clone of sw_cross_segment: a 2-day source segment +// straddles the 3-day warm boundary, so crossSegmentTimestamps() lands rows on +// the row-replay path — the only path where orphan archiving happens. +func orphanStages() *commonv1.ResourceOpts { + return &commonv1.ResourceOpts{ + ShardNum: 1, + SegmentInterval: &commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 2}, + Ttl: &commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 5}, + Stages: []*commonv1.LifecycleStage{{ + Name: "warm", + ShardNum: 1, + SegmentInterval: &commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 3}, + Ttl: &commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: 10}, + NodeSelector: "type=warm", + }}, + } +} + +// drainWriteResult closes a client-streaming write and returns an error on any +// non-success ack or non-EOF stream error (the error-returning sibling of +// drainWriteAcks, so the whole write can be retried under Eventually until a +// freshly-created schema has propagated to the liaison). +func drainWriteResult[R interface{ GetStatus() string }](recv func() (R, error), closeSend func() error) error { + if err := closeSend(); err != nil { + return err + } + for { + resp, recvErr := recv() + if errors.Is(recvErr, io.EOF) { + return nil + } + if recvErr != nil { + return recvErr + } + if s := resp.GetStatus(); s != "" && s != "STATUS_SUCCEED" { + return fmt.Errorf("write ack status: %s", s) + } + } +} + +// runLifecycleMigrationWithArchive runs one real hot->warm lifecycle migration +// (the actual lifecycle command) with the orphan archive policy enabled. archiveSubdir +// is the relative subdir (under each catalog's root path) the archive lands in. +func runLifecycleMigrationWithArchive(progressFile, reportDir, archiveSubdir string) { + lifecycleCmd := lifecycle.NewCommand() + args := []string{ + "--grpc-addr", SharedContext.DataAddr, + "--stream-root-path", SharedContext.SrcDir, + "--measure-root-path", SharedContext.SrcDir, + "--trace-root-path", SharedContext.SrcDir, + "--progress-file", progressFile, + "--report-dir", reportDir, + "--migration-orphan-policy", "archive", + "--migration-orphan-archive-subdir", archiveSubdir, + } + args = append(args, SharedContext.MetadataFlags...) + lifecycleCmd.SetArgs(args) + gomega.Expect(lifecycleCmd.Execute()).To(gomega.Succeed()) +} + +// readOrphanArchive gunzips every part-*.jsonl.gz under groupArchiveDir (the +// <catalog-root>/<subdir>/<group> directory) and returns the decoded records plus +// the summed manifest row count. +func readOrphanArchive(groupArchiveDir string) (recs []orphanRec, manifestRows int) { + root := groupArchiveDir + err := filepath.WalkDir(root, func(path string, d fs.DirEntry, walkErr error) error { + if walkErr != nil { + return walkErr + } + if d.IsDir() { + return nil + } + switch { + case strings.HasSuffix(d.Name(), ".jsonl.gz"): + f, openErr := os.Open(path) + gomega.Expect(openErr).NotTo(gomega.HaveOccurred()) + defer f.Close() + gr, gzErr := gzip.NewReader(f) + gomega.Expect(gzErr).NotTo(gomega.HaveOccurred(), "archive file %s must be a valid gzip", path) + defer gr.Close() + data, readErr := io.ReadAll(gr) + gomega.Expect(readErr).NotTo(gomega.HaveOccurred()) + dec := json.NewDecoder(bytes.NewReader(data)) + for dec.More() { + var rec orphanRec + gomega.Expect(dec.Decode(&rec)).To(gomega.Succeed(), "archive line in %s must be valid JSON", path) + recs = append(recs, rec) + } Review Comment: `manifest` data files are JSON Lines (multiple top-level JSON objects separated by newlines), but `Decoder.More()` only works when parsing inside an array/object. As written, this loop will not decode any records for JSONL. Switch to decoding in a `for { ... }` loop until `io.EOF` (and treat any other decode error as a test failure). ########## banyand/backup/lifecycle/row_replay_measure.go: ########## @@ -385,3 +421,110 @@ func (r *measureRowReplayer) buildWriteRequest( } return wr, iwr, nil } + +// segmentSuffixFromPath extracts "20260601" from ".../seg-20260601". +func segmentSuffixFromPath(segmentPath string) string { + return strings.TrimPrefix(filepath.Base(segmentPath), "seg-") +} + +// shardFromPath extracts the shard id from ".../shard-0". +func shardFromPath(shardPath string) uint32 { + id, _ := strconv.ParseUint(strings.TrimPrefix(filepath.Base(shardPath), "shard-"), 10, 32) + return uint32(id) +} Review Comment: `shardFromPath` ignores `ParseUint` errors and silently returns 0, which can misattribute orphan rows to the wrong shard and cause archive-path collisions (different shards writing under `shard-0`). Make these helpers validate the expected `seg-` / `shard-` prefix and return an error on parse failure, then propagate that error so the part replay fails fast instead of producing a misleading archive. ########## docs/operation/lifecycle.md: ########## @@ -121,18 +121,74 @@ lifecycle \ ### Command-Line Parameters -| Parameter | Description | Default Value | -| --------------------- | ----------------------------------------------------------------------------- | ------------------------------ | -| `--node-labels` | Labels of the current node (e.g., `type=hot,region=us-west`) | `nil` | -| `--grpc-addr` | gRPC address of the source data node to snapshot and read from | `127.0.0.1:17912` | -| `--enable-tls` | Enable TLS for gRPC connection | `false` | -| `--insecure` | Skip server certificate verification | `false` | -| `--cert` | Path to the gRPC server certificate | `""` | -| `--stream-root-path` | Root directory for stream catalog snapshots | `/tmp` | -| `--measure-root-path` | Root directory for measure catalog snapshots | `/tmp` | -| `--trace-root-path` | Root directory for trace catalog snapshots | `/tmp` | -| `--progress-file` | File path used for progress tracking and crash recovery | `/tmp/lifecycle-progress.json` | -| `--schedule` | Schedule for periodic backup (e.g., @yearly, @monthly, @weekly, @daily, etc.) | `""` | +| Parameter | Description | Default Value | +|-------------------------------------|----------------------------------------------------------------------------------------------------------| ------------------------------ | +| `--node-labels` | Labels of the current node (e.g., `type=hot,region=us-west`) | `nil` | +| `--grpc-addr` | gRPC address of the source data node to snapshot and read from | `127.0.0.1:17912` | Review Comment: The table rows start with `||`, which creates an empty first column in standard Markdown table parsing. If the intent is a 3-column table, use a single leading `|` (i.e., `| Parameter | Description | Default Value |`) to avoid misrendering in common renderers. ########## banyand/backup/lifecycle/orphan_archive.go: ########## @@ -0,0 +1,459 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "compress/gzip" + "encoding/base64" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "sync" + "time" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// orphanPolicy decides what row-replay does with rows whose schema was deleted +// from the registry: archive them to JSONL (default) or drop them. +type orphanPolicy uint8 + +const ( + orphanArchive orphanPolicy = iota + orphanDiscard +) + +func parseOrphanPolicy(s string) (orphanPolicy, error) { + switch s { + case "archive": + return orphanArchive, nil + case "discard": + return orphanDiscard, nil + default: + return 0, fmt.Errorf("invalid orphan policy %q (want archive|discard)", s) + } +} + +// orphanConfig is the resolved policy + archive root threaded from the service. +type orphanConfig struct { + rootDir string + policy orphanPolicy +} + +// sourceLoc locates the source part an orphan row came from. Stage is metadata +// only (not part of the path); segment/shard/part build the file path. +type sourceLoc struct { + Stage string `json:"stage"` + Segment string `json:"segment"` + Part string `json:"part_id"` + Shard uint32 `json:"shard"` +} + +// typedValue is a schema-free value with its explicit BanyanDB value type. +type typedValue struct { + Value interface{} `json:"value"` + Type string `json:"type"` +} + +// archiveRecord is one orphan row as written to JSONL. Fields is nil for stream; +// ElementID is set only for stream. +type archiveRecord struct { + Tags map[string]typedValue `json:"tags,omitempty"` + IndexedTags map[string][]string `json:"indexed_tags,omitempty"` + Fields map[string]typedValue `json:"fields,omitempty"` + ElementID string `json:"element_id,omitempty"` + Timestamp string `json:"timestamp"` + Group string `json:"group"` + Measure string `json:"measure"` + Catalog string `json:"catalog"` + Source sourceLoc `json:"source"` + Entity []string `json:"entity"` + SeriesID uint64 `json:"series_id"` + TimeNanos int64 `json:"timestamp_unix_nano"` + // Version is measure-only (a nanosecond timestamp, always > 0); stream records + // omit it rather than emit a misleading "version": 0. + Version int64 `json:"version,omitempty"` +} + +// orphanArchiver is created per (group, catalog) replayer; it owns the manifest +// aggregation across that group's segments and serializes archive writes. +type orphanArchiver struct { + l *logger.Logger + segments map[string]*segManifestState + group string + catalog string + cfg orphanConfig + mu sync.Mutex +} + +func newOrphanArchiver(cfg orphanConfig, group, catalog string, l *logger.Logger) *orphanArchiver { + return &orphanArchiver{cfg: cfg, group: group, catalog: catalog, l: l, segments: make(map[string]*segManifestState)} +} Review Comment: `orphanArchiver` assumes `l` is non-nil, but several tests construct it with `nil`. Any non-`IsNotExist` read/unmarshal error (or future code path) will dereference `a.l` and panic. Consider defensively defaulting to a non-nil logger in `newOrphanArchiver` (or guarding `a.l != nil` before logging) so error paths remain safe. ########## banyand/backup/lifecycle/stream_migration_visitor.go: ########## @@ -407,20 +411,48 @@ func (mv *streamMigrationVisitor) visitPartRowReplay(ctx context.Context, segmen // draining all in-flight confirmations before returning. A non-nil error means // some rows were not durably delivered; marking the part errored (rather than // completed) ensures the resume guard re-replays the whole part. - rowCount, err := replayer.replayPart(ctx, partPath) + res, err := replayer.replayPart(ctx, partPath) if err != nil { // Row-replay is all-or-nothing per part; mark the source part errored so // resume retries the whole part (same source key the guard checks above). recordReplayNodeErrors(mv.progress, mv.group, mv.sourceStage, mv.targetStage, catalogStream, err) mv.recordError(scopePart, segmentTR, sourceShardID, &partID, err.Error()) return fmt.Errorf("row-replay stream part %s: %w", partPath, err) } + if res.skipped > 0 { + // Some series could not be resolved from the series index: their rows remain + // only in the source part. Record a locatable error and retain the source + // segment (excluded from the post-migration delete set) so the data is not + // silently and permanently lost, mirroring measure. + mv.recordSkippedSource(segmentTR) + mv.recordError(scopePart, segmentTR, sourceShardID, &partID, + fmt.Sprintf("row-replay skipped %d unresolved rows (series-index gap); examples=%s", + res.skipped, formatSkipExamples(filterSkipExamplesByKind(res.detail, skipKindSidxGap)))) + mv.logger.Warn(). + Uint64("part_id", partID). + Int("skipped_rows", res.skipped). + Int("published_rows", res.rows). + Str("group", mv.group). + Msg("stream row-replay skipped unresolved series; retaining source segment to avoid data loss") + } + if res.orphanSkipped > 0 { + // Orphan (deleted schema): archived or discarded; the source segment is NOT + // retained — it is deleted normally. This is expected handling, not a + // migration error: the per-subject counts are reported via orphan_handling + // (pushed at Close), not recorded in the errors buckets. Review Comment: The comment refers to `orphan_handling`, but the report field added in this PR is `orphans` (see `buildMigrationReport`). Updating this comment (and the analogous one in the measure visitor) will avoid confusion when correlating logs/comments with the emitted report schema. ########## banyand/backup/lifecycle/orphan_archive.go: ########## @@ -0,0 +1,459 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "compress/gzip" + "encoding/base64" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "sync" + "time" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// orphanPolicy decides what row-replay does with rows whose schema was deleted +// from the registry: archive them to JSONL (default) or drop them. +type orphanPolicy uint8 + +const ( + orphanArchive orphanPolicy = iota + orphanDiscard +) + +func parseOrphanPolicy(s string) (orphanPolicy, error) { + switch s { + case "archive": + return orphanArchive, nil + case "discard": + return orphanDiscard, nil + default: + return 0, fmt.Errorf("invalid orphan policy %q (want archive|discard)", s) + } +} + +// orphanConfig is the resolved policy + archive root threaded from the service. +type orphanConfig struct { + rootDir string + policy orphanPolicy +} + +// sourceLoc locates the source part an orphan row came from. Stage is metadata +// only (not part of the path); segment/shard/part build the file path. +type sourceLoc struct { + Stage string `json:"stage"` + Segment string `json:"segment"` + Part string `json:"part_id"` + Shard uint32 `json:"shard"` +} + +// typedValue is a schema-free value with its explicit BanyanDB value type. +type typedValue struct { + Value interface{} `json:"value"` + Type string `json:"type"` +} + +// archiveRecord is one orphan row as written to JSONL. Fields is nil for stream; +// ElementID is set only for stream. +type archiveRecord struct { + Tags map[string]typedValue `json:"tags,omitempty"` + IndexedTags map[string][]string `json:"indexed_tags,omitempty"` + Fields map[string]typedValue `json:"fields,omitempty"` + ElementID string `json:"element_id,omitempty"` + Timestamp string `json:"timestamp"` + Group string `json:"group"` + Measure string `json:"measure"` + Catalog string `json:"catalog"` + Source sourceLoc `json:"source"` + Entity []string `json:"entity"` + SeriesID uint64 `json:"series_id"` + TimeNanos int64 `json:"timestamp_unix_nano"` + // Version is measure-only (a nanosecond timestamp, always > 0); stream records + // omit it rather than emit a misleading "version": 0. + Version int64 `json:"version,omitempty"` +} + +// orphanArchiver is created per (group, catalog) replayer; it owns the manifest +// aggregation across that group's segments and serializes archive writes. +type orphanArchiver struct { + l *logger.Logger + segments map[string]*segManifestState + group string + catalog string + cfg orphanConfig + mu sync.Mutex +} + +func newOrphanArchiver(cfg orphanConfig, group, catalog string, l *logger.Logger) *orphanArchiver { + return &orphanArchiver{cfg: cfg, group: group, catalog: catalog, l: l, segments: make(map[string]*segManifestState)} +} + +// groupDir is the archive root for this group: rootDir already points at the +// catalog's archive subdir (e.g. <catalog-root>/archive), so the per-segment +// layout below adds only the group — the catalog is recorded inside each record +// and manifest rather than as a path level. +func (a *orphanArchiver) groupDir() string { + return filepath.Join(a.cfg.rootDir, a.group) +} + +func (a *orphanArchiver) partFilePath(loc sourceLoc) string { + return filepath.Join(a.groupDir(), "seg-"+loc.Segment, fmt.Sprintf("shard-%d", loc.Shard), "part-"+loc.Part+".jsonl.gz") +} + +func (a *orphanArchiver) manifestPath(segment string) string { + return filepath.Join(a.groupDir(), "seg-"+segment, "manifest.json") +} + +// valueWithType renders a decoded TagValue as a schema-free typed value. +func valueWithType(tv *modelv1.TagValue) typedValue { + switch v := tv.GetValue().(type) { + case *modelv1.TagValue_Str: + return typedValue{Type: "STR", Value: v.Str.GetValue()} + case *modelv1.TagValue_Int: + return typedValue{Type: "INT64", Value: v.Int.GetValue()} + case *modelv1.TagValue_StrArray: + return typedValue{Type: "STR_ARRAY", Value: v.StrArray.GetValue()} + case *modelv1.TagValue_IntArray: + return typedValue{Type: "INT_ARRAY", Value: v.IntArray.GetValue()} + case *modelv1.TagValue_BinaryData: + return typedValue{Type: "BINARY", Value: v.BinaryData} + case *modelv1.TagValue_Timestamp: + return typedValue{Type: "TIMESTAMP", Value: v.Timestamp.AsTime().UTC().Format(time.RFC3339Nano)} + case *modelv1.TagValue_Null: + return typedValue{Type: "NULL", Value: nil} + default: + if tv.GetValue() == nil { + return typedValue{Type: "NULL", Value: nil} + } + // A populated but unrecognized oneof: surface it as UNKNOWN rather than + // silently masquerading as NULL. + return typedValue{Type: "UNKNOWN", Value: nil} + } +} + +// entityValueString renders one decoded entity TagValue faithfully (no binary +// loss): strings as-is, ints via strconv, binary as base64, arrays joined by +// their faithful element renderings, null/empty as "". +func entityValueString(tv *modelv1.TagValue) string { + switch v := tv.GetValue().(type) { + case *modelv1.TagValue_Str: + return v.Str.GetValue() + case *modelv1.TagValue_Int: + return strconv.FormatInt(v.Int.GetValue(), 10) + case *modelv1.TagValue_BinaryData: + return "base64:" + base64.StdEncoding.EncodeToString(v.BinaryData) + case *modelv1.TagValue_StrArray: + return strings.Join(v.StrArray.GetValue(), ",") + case *modelv1.TagValue_IntArray: + parts := make([]string, 0, len(v.IntArray.GetValue())) + for _, n := range v.IntArray.GetValue() { + parts = append(parts, strconv.FormatInt(n, 10)) + } + return strings.Join(parts, ",") + case *modelv1.TagValue_Timestamp: + return v.Timestamp.AsTime().UTC().Format(time.RFC3339Nano) + default: + return "" + } +} + +// segManifestState holds per-part tallies for one segment so the manifest can be +// rebuilt idempotently when a part is (re)archived. +type segManifestState struct { + parts map[string]map[string]manifestCount +} + +type measureTally struct { + series map[uint64]struct{} + rows int +} + +// manifestFile is the on-disk per-segment manifest. Parts is the idempotent +// per-part breakdown the summary (Measures/Total*) is rebuilt from; the per-part +// SeriesIDs lists let the summary report the true distinct series count (a series +// appearing in several parts is counted once) and stay resume-safe. +type manifestFile struct { + Parts map[string]map[string]manifestCount `json:"_parts"` + Group string `json:"group"` + Catalog string `json:"catalog"` + SourceStage string `json:"source_stage"` + Segment string `json:"segment"` + GeneratedAt string `json:"generated_at"` + Policy string `json:"policy"` + Measures []manifestMeasure `json:"measures"` + TotalRows int `json:"total_rows"` + TotalSeries int `json:"total_series"` +} + +type manifestMeasure struct { + Measure string `json:"measure"` + Files []string `json:"files"` + Rows int `json:"rows"` + Series int `json:"series"` +} + +// manifestCount records one (part, measure) breakdown: the row count and the +// sorted real series-ID list. Persisting the IDs (not a bare count) lets the +// summary union them across parts for a true distinct count and survive a resume. +type manifestCount struct { + SeriesIDs []uint64 `json:"series_ids"` + Rows int `json:"rows"` +} + +// orphanPartWriter writes one source part's orphan rows. For discard, f is nil +// and only counts are kept (never persisted). +type orphanPartWriter struct { + a *orphanArchiver + f *os.File + gw *gzip.Writer + enc *json.Encoder + tally map[string]*measureTally + loc sourceLoc +} + +// archiving reports whether this writer persists rows (archive policy) or only +// tallies them (discard policy). Derived from the parent archiver's config. +func (w *orphanPartWriter) archiving() bool { return w.a.cfg.policy == orphanArchive } + +// runPart opens the archive writer for loc, runs body (the part replay, which +// appends orphan rows via the passed writer), and commits the manifest on success +// or aborts (no manifest, partial file removed) on failure. A commit error is +// surfaced as a part error so the part is retried and the manifest never +// under-reports. Shared by the measure and stream replayers. +func (a *orphanArchiver) runPart(loc sourceLoc, body func(*orphanPartWriter) error) error { + pw := a.partWriter(loc) + if bodyErr := body(pw); bodyErr != nil { + _ = pw.close(false) + return bodyErr + } + if cerr := pw.close(true); cerr != nil { + return fmt.Errorf("finalize orphan archive for %s: %w", loc.Part, cerr) + } + return nil +} + +// partWriter builds a writer for one source part. It never fails: the archive +// file is opened lazily on the first orphan row (see appendRow), so a part with +// no orphan rows — the common case in a healthy migration — never touches the +// filesystem and no manifest is written for an orphan-free segment. An open +// failure surfaces from the first appendRow (and thus aborts the part). +func (a *orphanArchiver) partWriter(loc sourceLoc) *orphanPartWriter { + return &orphanPartWriter{a: a, loc: loc, tally: make(map[string]*measureTally)} +} + +func (w *orphanPartWriter) appendRow(rec *archiveRecord) error { + // Open the archive file lazily on the first row so orphan-free parts create no + // file or manifest. Open+encode FIRST so a failed write is never counted as an + // archived orphan row; only a durably-encoded row is tallied. For discard + // (not archiving) there is no write, so the tally always succeeds. + if w.archiving() && w.f == nil { + if err := w.openFile(); err != nil { + return err + } + } + if w.f != nil { + if err := w.enc.Encode(rec); err != nil { + return fmt.Errorf("encode archive row: %w", err) + } + } + t := w.tally[rec.Measure] + if t == nil { + t = &measureTally{series: make(map[uint64]struct{})} + w.tally[rec.Measure] = t + } + t.rows++ + t.series[rec.SeriesID] = struct{}{} + return nil +} + +// openFile creates the part's archive directory and file, truncating any prior +// content so a re-replayed part rewrites cleanly (idempotent on resume). +func (w *orphanPartWriter) openFile() error { + path := w.a.partFilePath(w.loc) + if err := os.MkdirAll(filepath.Dir(path), 0o750); err != nil { + return fmt.Errorf("create archive dir: %w", err) + } + f, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) + if err != nil { + return fmt.Errorf("open archive file %s: %w", path, err) + } + // JSONL is highly repetitive (constant group/series fields per row), so the + // archive is gzip-compressed on disk (~37x smaller); read it with + // `gunzip -c file.jsonl.gz | jq` (or `zcat`/`zgrep` on Linux). + w.f = f + w.gw = gzip.NewWriter(f) + w.enc = json.NewEncoder(w.gw) + return nil +} + +// close finalizes the part archive. The file handle (if any) is always closed. +// When commit is true the part completed, so its tallies are folded into the +// segment manifest and the manifest is rewritten; the first of {file-close error, +// manifest error} is returned. When commit is false the part aborted mid-way, so +// the manifest is NOT folded (it must never reflect a partial part) and the +// partial .jsonl is best-effort removed; nil is returned because the part's real +// error already propagates. Discard (f == nil) is a no-op. +func (w *orphanPartWriter) close(commit bool) error { + if w.f == nil { + return nil + } + // Close the gzip writer first to flush its buffer + trailer into the file, then + // close the file. A buffered write error surfaces here (not at appendRow), so + // close's error is what makes an archive failure fatal to the part. + closeErr := w.gw.Close() + if ferr := w.f.Close(); closeErr == nil { + closeErr = ferr + } + if !commit { + _ = os.Remove(w.a.partFilePath(w.loc)) + return nil + } + if closeErr != nil { + return fmt.Errorf("close archive file: %w", closeErr) + } + return w.a.foldAndWriteManifest(w.loc, w.tally) +} + +func (a *orphanArchiver) foldAndWriteManifest(loc sourceLoc, tally map[string]*measureTally) error { + a.mu.Lock() + defer a.mu.Unlock() + st := a.segments[loc.Segment] + if st == nil { + st = &segManifestState{parts: make(map[string]map[string]manifestCount)} + if loaded := a.loadManifestParts(loc.Segment); loaded != nil { + st.parts = loaded + } + a.segments[loc.Segment] = st + } + // Store the real per-part series-ID lists (not a bare count) so the summary can + // union them across parts for a true distinct count and a resume can re-derive + // the same totals. + counts := make(map[string]manifestCount, len(tally)) + for measure, t := range tally { + ids := make([]uint64, 0, len(t.series)) + for id := range t.series { + ids = append(ids, id) + } + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) + counts[measure] = manifestCount{Rows: t.rows, SeriesIDs: ids} + } + partKey := fmt.Sprintf("shard-%d/part-%s.jsonl.gz", loc.Shard, loc.Part) + st.parts[partKey] = counts // overwrite -> idempotent on resume + return a.writeManifest(loc, st) +} + +// loadManifestParts reseeds per-part counts from an existing manifest so a resume +// run keeps prior parts' entries. +func (a *orphanArchiver) loadManifestParts(segment string) map[string]map[string]manifestCount { + b, err := os.ReadFile(a.manifestPath(segment)) + if err != nil { + // First run for this segment has no manifest yet; any other read error means + // prior tallies are about to be lost on the next write, so make it visible. + if !os.IsNotExist(err) { + a.l.Warn().Err(err).Str("segment", segment).Msg("cannot read existing orphan manifest; prior tallies will be overwritten") + } Review Comment: `orphanArchiver` assumes `l` is non-nil, but several tests construct it with `nil`. Any non-`IsNotExist` read/unmarshal error (or future code path) will dereference `a.l` and panic. Consider defensively defaulting to a non-nil logger in `newOrphanArchiver` (or guarding `a.l != nil` before logging) so error paths remain safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
