This is an automated email from the ASF dual-hosted git repository. mrproliu pushed a commit to branch lifecycle-oom in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 2474a0a7b90e5b7f576bea05806dd02915558050 Author: mrproliu <[email protected]> AuthorDate: Fri Jun 26 12:58:14 2026 +0800 feat(lifecycle): throttle migration under receiver memory pressure; fix bluge dedup OOM --- CHANGES.md | 2 + api/proto/banyandb/cluster/v1/rpc.proto | 1 + .../backup/lifecycle/measure_migration_visitor.go | 63 ++++++---- banyand/backup/lifecycle/retry.go | 133 +++++++++++++++++++++ banyand/backup/lifecycle/retry_test.go | 118 ++++++++++++++++++ banyand/backup/lifecycle/row_replay_measure.go | 4 +- banyand/backup/lifecycle/row_replay_pipeline.go | 4 +- banyand/backup/lifecycle/row_replay_stream.go | 2 +- banyand/backup/lifecycle/row_replay_trace.go | 2 +- banyand/backup/lifecycle/service.go | 2 + .../backup/lifecycle/stream_migration_visitor.go | 101 +++++++++------- .../backup/lifecycle/trace_migration_visitor.go | 126 +++++++++++-------- banyand/measure/measure.go | 1 + banyand/measure/svc_data.go | 4 +- banyand/measure/svc_liaison.go | 3 + banyand/measure/svc_standalone.go | 2 + banyand/measure/write_data.go | 46 +++++-- banyand/measure/write_data_busy_test.go | 67 +++++++++++ banyand/protector/wait.go | 63 ++++++++++ banyand/protector/wait_test.go | 72 +++++++++++ banyand/queue/pub/chunked_sync.go | 6 + banyand/queue/queue.go | 18 ++- banyand/queue/queue_test.go | 35 ++++++ banyand/queue/sub/chunked_sync.go | 5 + banyand/queue/sub/chunked_sync_test.go | 62 ++++++++++ banyand/stream/stream.go | 1 + banyand/stream/svc_liaison.go | 3 + banyand/stream/svc_standalone.go | 6 +- banyand/stream/write_data.go | 91 ++++++++++---- banyand/stream/write_data_busy_test.go | 94 +++++++++++++++ banyand/trace/svc_liaison.go | 2 + banyand/trace/svc_standalone.go | 4 +- banyand/trace/trace.go | 1 + banyand/trace/write_data.go | 47 ++++++-- banyand/trace/write_data_busy_test.go | 67 +++++++++++ docs/api-reference.md | 1 + docs/operation/configuration.md | 3 + docs/operation/lifecycle.md | 1 + go.mod | 2 +- go.sum | 4 +- 40 files changed, 1092 insertions(+), 177 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a254f3fc2..34e7672b4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,8 @@ Release Notes. - Enhance segment lifecycle: `refCount` now counts only active users, decoupled from "open" (`index != nil`), adding a "dormant" state (open, `refCount == 0`). A `DecRef` to zero no longer closes a segment; idle reclaim and retention delete act only at `refCount == 0`, so an in-flight snapshot/inspect is no longer torn down mid-operation (fixing the cold-node nil-index panic and bluge lock churn) while idle segments still release their bluge writers. - Speed up GCS backup uploads: write each object and its checksum metadata in one request, dropping the per-object `Update` round-trip. - Lifecycle migration now archives rows whose measure/stream schema was deleted from the registry, instead of aborting the group. +- Throttle lifecycle migration under receiver memory pressure so it no longer OOM-kills the destination data node, via three always-on mechanisms: per-chunk `SYNC_STATUS_SERVER_BUSY` load-shed (the sender backs off and retries); a bounded memory wait before introducing an external series-index segment (`{measure,stream,trace}-lifecycle-receive-mem-wait-timeout`, default 5m); and sender-side bounded-backoff retry of transient send failures — target node restarting/`no nodes available`, di [...] +- Cut the lifecycle external series-index deduplicator's memory from ~1.8 GB to near-zero per segment by bumping bluge to a memory-efficient implementation: duplicates are now resolved via `_id` term-dictionary point lookups (`Dictionary.Contains`) instead of decompressing and retaining every segment's stored fields, removing the dominant heap source during external-segment introduction. No public API change and identical dedup semantics (verified by a differential test); this is the roo [...] ### Bug Fixes diff --git a/api/proto/banyandb/cluster/v1/rpc.proto b/api/proto/banyandb/cluster/v1/rpc.proto index aec1b9963..a763acb8d 100644 --- a/api/proto/banyandb/cluster/v1/rpc.proto +++ b/api/proto/banyandb/cluster/v1/rpc.proto @@ -182,6 +182,7 @@ enum SyncStatus { SYNC_STATUS_SYNC_COMPLETE = 5; // Entire sync operation completed successfully. SYNC_STATUS_VERSION_UNSUPPORTED = 6; // Version not supported for sync operations. SYNC_STATUS_FORMAT_VERSION_MISMATCH = 7; // File format version incompatible. + SYNC_STATUS_SERVER_BUSY = 8; // Receiver under memory pressure; sender should back off and retry the whole part. } service Service { diff --git a/banyand/backup/lifecycle/measure_migration_visitor.go b/banyand/backup/lifecycle/measure_migration_visitor.go index 2059922b9..1c6a2f20d 100644 --- a/banyand/backup/lifecycle/measure_migration_visitor.go +++ b/banyand/backup/lifecycle/measure_migration_visitor.go @@ -284,10 +284,12 @@ func (mv *measureMigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, s segmentIDStr := getSegmentTimeRange(targetSegmentTime, mv.targetStageInterval).String() for _, shardID := range shardIDs { targetShardID := mv.calculateTargetShardID(uint32(shardID)) - partData := mv.createStreamingSegmentFromFiles(targetShardID, files, segmentTR, data.TopicMeasureSeriesSync.String()) - // Stream segment to target shard replicas - if err := mv.streamPartToTargetShard(partData); err != nil { + // Stream segment to target shard replicas. The factory rebuilds the + // part on every retry so each attempt gets fresh offset-0 readers. + if err := mv.streamPartToTargetShard(targetShardID, func() queue.StreamingPartData { + return mv.createStreamingSegmentFromFiles(targetShardID, files, segmentTR, data.TopicMeasureSeriesSync.String()) + }); err != nil { errorMsg := fmt.Sprintf("failed to stream measure segment to target shard %d: %v", targetShardID, err) mv.recordError(scopeSeries, segmentTR, shardID, nil, errorMsg) return fmt.Errorf("failed to stream measure segment to target shard %d: %w", targetShardID, err) @@ -370,19 +372,31 @@ func (mv *measureMigrationVisitor) VisitPart(segmentTR *timestamp.TimeRange, sou continue } - // Create file readers for this part - files, release := measure.CreatePartFileReaderFromPath(partPath, mv.lfs) - defer release() - - // Clone part data for this target segment - targetPartData := partData - targetPartData.Group = mv.group - targetPartData.ShardID = targetShardID - targetPartData.Topic = data.TopicMeasurePartSync.String() - targetPartData.Files = files + // Reopen the part each attempt for fresh offset-0 readers, releasing the + // prior attempt's handles first; the deferred call frees the last set. + var prevRelease func() + defer func() { + if prevRelease != nil { + prevRelease() + } + }() + mk := func() queue.StreamingPartData { + if prevRelease != nil { + prevRelease() + prevRelease = nil + } + files, release := measure.CreatePartFileReaderFromPath(partPath, mv.lfs) + prevRelease = release + targetPartData := partData + targetPartData.Group = mv.group + targetPartData.ShardID = targetShardID + targetPartData.Topic = data.TopicMeasurePartSync.String() + targetPartData.Files = files + return targetPartData + } // Stream part to target segment - if err := mv.streamPartToTargetShard(targetPartData); err != nil { + if err := mv.streamPartToTargetShard(targetShardID, mk); err != nil { errorMsg := fmt.Sprintf("failed to stream measure part to target segment %s: %v", targetSegmentTime.Format(time.RFC3339), err) mv.recordError(scopePart, segmentTR, sourceShardID, &partID, errorMsg) return fmt.Errorf("failed to stream measure part to target segment: %w", err) @@ -491,22 +505,21 @@ func (mv *measureMigrationVisitor) calculateTargetShardID(sourceShardID uint32) return calculateTargetShardID(sourceShardID, mv.targetShardNum) } -// streamPartToTargetShard sends part data to all replicas of the target shard. -func (mv *measureMigrationVisitor) streamPartToTargetShard(partData queue.StreamingPartData) error { - targetShardID := partData.ShardID +// streamPartToTargetShard sends the part to every replica with bounded +// exponential-backoff retry (transient: target restarting, disconnect, +// receiver SERVER_BUSY). streamPartToNode closes the part's readers after each +// send, so mk() is called per attempt to rebuild fresh offset-0 readers. +func (mv *measureMigrationVisitor) streamPartToTargetShard(targetShardID uint32, mk func() queue.StreamingPartData) error { copies := mv.replicas + 1 // Send to all replicas using the exact pattern from steps.go:219-236 for replicaID := uint32(0); replicaID < copies; replicaID++ { - // Use selector.Pick exactly like steps.go:220 - nodeID, err := mv.selector.Pick(mv.group, "", targetShardID, replicaID) + err := pickAndRun(mv.logger, mv.selector, mv.group, "", targetShardID, replicaID, func(nodeID string) error { + partData := mk() + return mv.streamPartToNode(nodeID, partData.ShardID, partData) + }) if err != nil { - return fmt.Errorf("failed to pick node for shard %d replica %d: %w", targetShardID, replicaID, err) - } - - // Stream part data to target node using chunked sync - if err := mv.streamPartToNode(nodeID, targetShardID, partData); err != nil { - return fmt.Errorf("failed to stream measure part to node %s: %w", nodeID, err) + return fmt.Errorf("failed to stream measure part to replica %d: %w", replicaID, err) } } diff --git a/banyand/backup/lifecycle/retry.go b/banyand/backup/lifecycle/retry.go new file mode 100644 index 000000000..7346962d7 --- /dev/null +++ b/banyand/backup/lifecycle/retry.go @@ -0,0 +1,133 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "errors" + "fmt" + "strings" + "time" + + "github.com/cenkalti/backoff/v4" + + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/node" +) + +// defaultLifecycleSendRetryTimeout bounds the total wall-clock spent retrying +// transient failures when streaming a part to a target node (e.g. while the +// target node restarts). Configurable via --lifecycle-send-retry-timeout. +const defaultLifecycleSendRetryTimeout = 15 * time.Minute + +// lifecycleSendRetryTimeout is the active bound, set by the lifecycle flag. +var lifecycleSendRetryTimeout = defaultLifecycleSendRetryTimeout + +// isNoNodes reports whether err means the target node could not be picked +// because no node is currently available (e.g. the target is restarting). +// It matches both the typed sentinel and the plain round-robin error string. +func isNoNodes(err error) bool { + if errors.Is(err, node.ErrNoAvailableNode) { + return true + } + return err != nil && strings.Contains(err.Error(), "no nodes available") +} + +// isTransientSend reports whether err from streaming a part to a node is +// transient and worth retrying: receiver back-pressure (SERVER_BUSY) or a +// transient/failover gRPC error such as Unavailable, ResourceExhausted, or a +// dropped connection. +func isTransientSend(err error) bool { + return errors.Is(err, queue.ErrServerBusy) || grpchelper.IsTransientError(err) || grpchelper.IsFailoverError(err) +} + +// retryLogInterval throttles the "still retrying" heartbeat so a long wait for +// a restarting node logs at most once per minute instead of on every attempt. +const retryLogInterval = time.Minute + +// nodePicker is the one method of node.Selector that the send-retry path needs; +// narrowing it keeps runWithRetry/pickWithRetry decoupled and easy to test. +type nodePicker interface { + Pick(group, name string, shardID, replicaID uint32) (string, error) +} + +// runWithRetry runs op under bounded exponential backoff capped by +// lifecycleSendRetryTimeout. op returns a transient error to retry or +// backoff.Permanent(err) to stop immediately. While retrying it logs a +// heartbeat at most once per minute so a long wait for a restarting node is not +// mistaken for a hang. +func runWithRetry(l *logger.Logger, stage string, op func() error) error { + bo := backoff.NewExponentialBackOff() + bo.MaxElapsedTime = lifecycleSendRetryTimeout + start := time.Now() + var lastLog time.Time + return backoff.RetryNotify(op, bo, func(err error, _ time.Duration) { + now := time.Now() + if !lastLog.IsZero() && now.Sub(lastLog) < retryLogInterval { + return + } + lastLog = now + l.Warn().Err(err).Str("stage", stage).Dur("elapsed", now.Sub(start)). + Msg("transient lifecycle send failure, still retrying") + }) +} + +// pickWithRetry picks a target node, retrying on "no nodes available" (target +// restarting) under runWithRetry's bounded backoff and heartbeat logging. +func pickWithRetry(l *logger.Logger, p nodePicker, group, name string, shardID uint32) (string, error) { + var nodeID string + err := runWithRetry(l, fmt.Sprintf("pick group=%s name=%s shard=%d", group, name, shardID), func() error { + var pErr error + nodeID, pErr = p.Pick(group, name, shardID, 0) + switch { + case pErr == nil: + return nil + case isNoNodes(pErr): + return pErr + default: + return backoff.Permanent(pErr) + } + }) + return nodeID, err +} + +// pickAndRun picks a target node and runs run against it inside one bounded +// backoff loop: a "no nodes" pick error or a transient run error retries the +// whole pick+run (so a failed send re-picks, possibly a healthier node); any +// other error stops immediately. Heartbeat-logged like runWithRetry. +func pickAndRun(l *logger.Logger, p nodePicker, group, name string, shardID, replicaID uint32, + run func(nodeID string) error, +) error { + return runWithRetry(l, fmt.Sprintf("send group=%s name=%s shard=%d replica=%d", group, name, shardID, replicaID), func() error { + nodeID, pickErr := p.Pick(group, name, shardID, replicaID) + if pickErr != nil { + if isNoNodes(pickErr) { + return pickErr + } + return backoff.Permanent(pickErr) + } + if runErr := run(nodeID); runErr != nil { + if isTransientSend(runErr) { + return runErr + } + return backoff.Permanent(runErr) + } + return nil + }) +} diff --git a/banyand/backup/lifecycle/retry_test.go b/banyand/backup/lifecycle/retry_test.go new file mode 100644 index 000000000..684bba16a --- /dev/null +++ b/banyand/backup/lifecycle/retry_test.go @@ -0,0 +1,118 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/node" +) + +func TestIsNoNodes(t *testing.T) { + tests := []struct { + err error + name string + want bool + }{ + {name: "typed sentinel", err: node.ErrNoAvailableNode, want: true}, + {name: "wrapped sentinel", err: fmt.Errorf("pick failed: %w", node.ErrNoAvailableNode), want: true}, + {name: "plain round-robin string", err: errors.New("no nodes available"), want: true}, + {name: "wrapped round-robin string", err: fmt.Errorf("replica 0: %w", errors.New("no nodes available")), want: true}, + {name: "unrelated error", err: errors.New("connection refused"), want: false}, + {name: "nil error", err: nil, want: false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, isNoNodes(tt.err)) + }) + } +} + +func TestIsTransientSend(t *testing.T) { + tests := []struct { + err error + name string + want bool + }{ + {name: "server busy sentinel", err: queue.ErrServerBusy, want: true}, + {name: "wrapped server busy", err: fmt.Errorf("receiver busy on node x: %w", queue.ErrServerBusy), want: true}, + {name: "plain non-transient", err: errors.New("permission denied"), want: false}, + {name: "nil error", err: nil, want: false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, isTransientSend(tt.err)) + }) + } +} + +// fakePicker implements nodePicker: it returns "no nodes available" the first +// noNodes times, then failErr (if set), then node. +type fakePicker struct { + failErr error + node string + noNodes int + calls int +} + +func (f *fakePicker) Pick(_, _ string, _, _ uint32) (string, error) { + f.calls++ + if f.noNodes > 0 { + f.noNodes-- + return "", errors.New("no nodes available") + } + if f.failErr != nil { + return "", f.failErr + } + return f.node, nil +} + +func TestPickWithRetry(t *testing.T) { + l := logger.GetLogger("test") + + t.Run("success first try", func(t *testing.T) { + p := &fakePicker{node: "n1"} + got, err := pickWithRetry(l, p, "g", "n", 0) + assert.NoError(t, err) + assert.Equal(t, "n1", got) + assert.Equal(t, 1, p.calls) + }) + + t.Run("permanent error fails fast without retry", func(t *testing.T) { + boom := errors.New("unknown shard") + p := &fakePicker{failErr: boom} + got, err := pickWithRetry(l, p, "g", "n", 0) + assert.ErrorIs(t, err, boom) + assert.Empty(t, got) + assert.Equal(t, 1, p.calls) + }) + + t.Run("no nodes then recover retries", func(t *testing.T) { + p := &fakePicker{node: "n2", noNodes: 1} + got, err := pickWithRetry(l, p, "g", "n", 0) + assert.NoError(t, err) + assert.Equal(t, "n2", got) + assert.Equal(t, 2, p.calls) + }) +} diff --git a/banyand/backup/lifecycle/row_replay_measure.go b/banyand/backup/lifecycle/row_replay_measure.go index 6f34a8002..b0c72f46e 100644 --- a/banyand/backup/lifecycle/row_replay_measure.go +++ b/banyand/backup/lifecycle/row_replay_measure.go @@ -347,7 +347,7 @@ func (r *measureRowReplayer) routeColumnar( if rErr != nil { return 0, nil, "", fmt.Errorf("route %s: %w", bc.subject, rErr) } - pickedNode, pErr := r.selector.Pick(r.group, bc.subject, uint32(sid), 0) + pickedNode, pErr := pickWithRetry(r.logger, r.selector, r.group, bc.subject, uint32(sid)) if pErr != nil { return 0, nil, "", fmt.Errorf("pick target node for %s: %w", bc.subject, pErr) } @@ -363,7 +363,7 @@ func (r *measureRowReplayer) routeColumnar( if rErr != nil { return 0, nil, "", fmt.Errorf("route %s: %w", bc.subject, rErr) } - nodeID, pErr := r.selector.Pick(r.group, bc.subject, uint32(sid), 0) + nodeID, pErr := pickWithRetry(r.logger, r.selector, r.group, bc.subject, uint32(sid)) if pErr != nil { return 0, nil, "", fmt.Errorf("pick target node for %s: %w", bc.subject, pErr) } diff --git a/banyand/backup/lifecycle/row_replay_pipeline.go b/banyand/backup/lifecycle/row_replay_pipeline.go index 7b3fc3d94..591edc93a 100644 --- a/banyand/backup/lifecycle/row_replay_pipeline.go +++ b/banyand/backup/lifecycle/row_replay_pipeline.go @@ -302,10 +302,10 @@ func (s *batchSender) enqueueMarshaled(ctx context.Context, nodeID, group string // routeAndEnqueue picks the target node for iwr and enqueues it. The // proto-message replayers (stream/trace) share this Pick+build+enqueue tail; the // measure columnar path routes inline via routeColumnar + enqueueMarshaled. -func (s *batchSender) routeAndEnqueue(ctx context.Context, selector node.Selector, +func (s *batchSender) routeAndEnqueue(ctx context.Context, l *logger.Logger, selector node.Selector, group, name string, shardID uint32, iwr proto.Message, ) error { - nodeID, err := selector.Pick(group, name, shardID, 0) + nodeID, err := pickWithRetry(l, selector, group, name, shardID) if err != nil { return fmt.Errorf("pick target node for %s: %w", name, err) } diff --git a/banyand/backup/lifecycle/row_replay_stream.go b/banyand/backup/lifecycle/row_replay_stream.go index 471a18ba1..408e93c95 100644 --- a/banyand/backup/lifecycle/row_replay_stream.go +++ b/banyand/backup/lifecycle/row_replay_stream.go @@ -234,7 +234,7 @@ func (r *streamRowReplayer) publishRow(ctx context.Context, pw *orphanPartWriter if err != nil { return err } - return r.sender.routeAndEnqueue(ctx, r.selector, r.group, wr.Metadata.Name, iwr.ShardId, iwr) + return r.sender.routeAndEnqueue(ctx, r.logger, r.selector, r.group, wr.Metadata.Name, iwr.ShardId, iwr) } // archiveOrphanRow writes one orphan stream row to the part archive (a no-op diff --git a/banyand/backup/lifecycle/row_replay_trace.go b/banyand/backup/lifecycle/row_replay_trace.go index fd2d4dd52..919ffcc20 100644 --- a/banyand/backup/lifecycle/row_replay_trace.go +++ b/banyand/backup/lifecycle/row_replay_trace.go @@ -136,5 +136,5 @@ func (r *traceRowReplayer) buildWriteRequest(row dumptrace.Row) (*tracev1.WriteR func (r *traceRowReplayer) publishRow(ctx context.Context, row dumptrace.Row) error { _, iwr := r.buildWriteRequest(row) - return r.sender.routeAndEnqueue(ctx, r.selector, r.group, r.traceName, iwr.ShardId, iwr) + return r.sender.routeAndEnqueue(ctx, r.logger, r.selector, r.group, r.traceName, iwr.ShardId, iwr) } diff --git a/banyand/backup/lifecycle/service.go b/banyand/backup/lifecycle/service.go index 693723464..d142148a8 100644 --- a/banyand/backup/lifecycle/service.go +++ b/banyand/backup/lifecycle/service.go @@ -212,6 +212,8 @@ func (l *lifecycleService) FlagSet() *run.FlagSet { "Maximum rows per row-replay batch (row-replay is the fallback for parts spanning multiple target segments)") flagS.VarP(&rowReplayMaxBatchBytes, "row-replay-max-batch-bytes", "", "Maximum in-flight marshaled bytes per row-replay batch; caps peak marshal memory for large bodies (default: 32MB)") + flagS.DurationVar(&lifecycleSendRetryTimeout, "lifecycle-send-retry-timeout", defaultLifecycleSendRetryTimeout, + "Max total time to retry streaming a migration part to a target node on transient failures (target restarting, disconnect, receiver busy)") // Lifecycle server flags flagS.BoolVar(&l.lifecycleTLS, "lifecycle-tls", false, "connection uses TLS if true, else plain TCP") diff --git a/banyand/backup/lifecycle/stream_migration_visitor.go b/banyand/backup/lifecycle/stream_migration_visitor.go index 655583de6..87483893e 100644 --- a/banyand/backup/lifecycle/stream_migration_visitor.go +++ b/banyand/backup/lifecycle/stream_migration_visitor.go @@ -260,17 +260,11 @@ func (mv *streamMigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, se segmentIDStr := getSegmentTimeRange(targetSegmentTime, mv.targetStageInterval).String() for _, shardID := range shardIDs { targetShardID := mv.calculateTargetShardID(uint32(shardID)) - ff := make([]queue.FileInfo, 0, len(files)) - for _, file := range files { - ff = append(ff, queue.FileInfo{ - Name: file.name, - Reader: file.file.SequentialRead(), - }) - } - partData := mv.createStreamingSegmentFromFiles(targetShardID, ff, segmentTR, data.TopicStreamSeriesSync.String()) - - // Stream segment to target shard replicas - if err := mv.streamPartToTargetShard(partData); err != nil { + // Stream segment to target shard replicas. The factory rebuilds the + // part on every retry so each attempt gets fresh offset-0 readers. + if err := mv.streamPartToTargetShard(targetShardID, func() queue.StreamingPartData { + return mv.createStreamingSegmentFromFiles(targetShardID, files, segmentTR, data.TopicStreamSeriesSync.String()) + }); err != nil { errorMsg := fmt.Sprintf("failed to stream segment to target shard %d: %v", targetShardID, err) mv.recordError(scopeSeries, segmentTR, shardID, nil, errorMsg) return fmt.Errorf("failed to stream segment to target shard %d: %w", targetShardID, err) @@ -347,19 +341,31 @@ func (mv *streamMigrationVisitor) VisitPart(segmentTR *timestamp.TimeRange, sour continue } - // Create file readers for this part - files, release := stream.CreatePartFileReaderFromPath(partPath, mv.lfs) - defer release() - - // Clone part data for this target segment - targetPartData := partData - targetPartData.Group = mv.group - targetPartData.ShardID = targetShardID - targetPartData.Topic = data.TopicStreamPartSync.String() - targetPartData.Files = files + // Reopen the part each attempt for fresh offset-0 readers, releasing the + // prior attempt's handles first; the deferred call frees the last set. + var prevRelease func() + defer func() { + if prevRelease != nil { + prevRelease() + } + }() + mk := func() queue.StreamingPartData { + if prevRelease != nil { + prevRelease() + prevRelease = nil + } + files, release := stream.CreatePartFileReaderFromPath(partPath, mv.lfs) + prevRelease = release + targetPartData := partData + targetPartData.Group = mv.group + targetPartData.ShardID = targetShardID + targetPartData.Topic = data.TopicStreamPartSync.String() + targetPartData.Files = files + return targetPartData + } // Stream part to target segment - if err := mv.streamPartToTargetShard(targetPartData); err != nil { + if err := mv.streamPartToTargetShard(targetShardID, mk); err != nil { errorMsg := fmt.Sprintf("failed to stream part to target segment %s: %v", targetSegmentTime.Format(time.RFC3339), err) mv.recordError(scopePart, segmentTR, sourceShardID, &partID, errorMsg) return fmt.Errorf("failed to stream part to target segment: %w", err) @@ -520,8 +526,9 @@ func (mv *streamMigrationVisitor) VisitElementIndex(segmentTR *timestamp.TimeRan Uint32("target_shard", targetShardID). Msg("found element index segment files for migration") - // Create FileInfo for this segment file - files := make([]queue.FileInfo, 0, len(segmentFiles)) + // Keep the open segment file handles so the send factory can rebuild + // fresh offset-0 readers on every retry via SequentialRead. + openFiles := make([]fileInfo, 0, len(segmentFiles)) // Process each segment file for _, segmentFileName := range segmentFiles { // Extract segment ID from filename (remove .seg extension) @@ -558,9 +565,9 @@ func (mv *streamMigrationVisitor) VisitElementIndex(segmentTR *timestamp.TimeRan // Close the file reader defer segmentFile.Close() - files = append(files, queue.FileInfo{ - Name: segmentFileName, - Reader: segmentFile.SequentialRead(), + openFiles = append(openFiles, fileInfo{ + file: segmentFile, + name: segmentFileName, }) mv.logger.Info(). @@ -571,10 +578,12 @@ func (mv *streamMigrationVisitor) VisitElementIndex(segmentTR *timestamp.TimeRan Int("total_segments", mv.progress.GetStreamElementIndexCount(mv.group)). Msg("element index segment migration completed successfully") } - partData := mv.createStreamingSegmentFromFiles(targetShardID, files, segmentTR, data.TopicStreamElementIndexSync.String()) - // Stream segment file to target shard replicas - if err := mv.streamPartToTargetShard(partData); err != nil { + // Stream segment file to target shard replicas. The factory rebuilds the + // part on every retry so each attempt gets fresh offset-0 readers. + if err := mv.streamPartToTargetShard(targetShardID, func() queue.StreamingPartData { + return mv.createStreamingSegmentFromFiles(targetShardID, openFiles, segmentTR, data.TopicStreamElementIndexSync.String()) + }); err != nil { errorMsg := fmt.Sprintf("failed to stream element index to target shard: %v", err) mv.recordError(scopeElementIndex, segmentTR, sourceShardID, nil, errorMsg) return fmt.Errorf("failed to stream element index to target shard: %w", err) @@ -592,22 +601,21 @@ func (mv *streamMigrationVisitor) calculateTargetShardID(sourceShardID uint32) u return calculateTargetShardID(sourceShardID, mv.targetShardNum) } -// streamPartToTargetShard sends part data to all replicas of the target shard. -func (mv *streamMigrationVisitor) streamPartToTargetShard(partData queue.StreamingPartData) error { - targetShardID := partData.ShardID +// streamPartToTargetShard sends the part to every replica with bounded +// exponential-backoff retry (transient: target restarting, disconnect, +// receiver SERVER_BUSY). streamPartToNode closes the part's readers after each +// send, so mk() is called per attempt to rebuild fresh offset-0 readers. +func (mv *streamMigrationVisitor) streamPartToTargetShard(targetShardID uint32, mk func() queue.StreamingPartData) error { copies := mv.replicas + 1 // Send to all replicas using the exact pattern from steps.go:219-236 for replicaID := uint32(0); replicaID < copies; replicaID++ { - // Use selector.Pick exactly like steps.go:220 - nodeID, err := mv.selector.Pick(mv.group, "", targetShardID, replicaID) + err := pickAndRun(mv.logger, mv.selector, mv.group, "", targetShardID, replicaID, func(nodeID string) error { + partData := mk() + return mv.streamPartToNode(nodeID, partData.ShardID, partData) + }) if err != nil { - return fmt.Errorf("failed to pick node for shard %d replica %d: %w", targetShardID, replicaID, err) - } - - // Stream part data to target node using chunked sync - if err := mv.streamPartToNode(nodeID, targetShardID, partData); err != nil { - return fmt.Errorf("failed to stream part to node %s: %w", nodeID, err) + return fmt.Errorf("failed to stream part to replica %d: %w", replicaID, err) } } @@ -686,15 +694,22 @@ func (mv *streamMigrationVisitor) Close() error { // createStreamingSegmentFromFiles creates StreamingPartData from segment files. func (mv *streamMigrationVisitor) createStreamingSegmentFromFiles( targetShardID uint32, - files []queue.FileInfo, + files []fileInfo, segmentTR *timestamp.TimeRange, topic string, ) queue.StreamingPartData { + filesInfo := make([]queue.FileInfo, 0, len(files)) + for _, file := range files { + filesInfo = append(filesInfo, queue.FileInfo{ + Name: file.name, + Reader: file.file.SequentialRead(), + }) + } segmentData := queue.StreamingPartData{ Group: mv.group, ShardID: targetShardID, // Use calculated target shard Topic: topic, // Use the new topic - Files: files, + Files: filesInfo, MinTimestamp: segmentTR.Start.UnixNano(), MaxTimestamp: segmentTR.End.UnixNano(), } diff --git a/banyand/backup/lifecycle/trace_migration_visitor.go b/banyand/backup/lifecycle/trace_migration_visitor.go index 4c05dab01..4ad0bf62c 100644 --- a/banyand/backup/lifecycle/trace_migration_visitor.go +++ b/banyand/backup/lifecycle/trace_migration_visitor.go @@ -262,17 +262,11 @@ func (mv *traceMigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, ser segmentIDStr := getSegmentTimeRange(targetSegmentTime, mv.targetStageInterval).String() for _, shardID := range shardIDs { targetShardID := mv.calculateTargetShardID(uint32(shardID)) - ff := make([]queue.FileInfo, 0, len(files)) - for _, file := range files { - ff = append(ff, queue.FileInfo{ - Name: file.name, - Reader: file.file.SequentialRead(), - }) - } - partData := mv.createStreamingSegmentFromFiles(targetShardID, ff, segmentTR, data.TopicTraceSeriesSync.String()) - - // Stream segment to target shard replicas - if err := mv.streamPartToTargetShard(targetShardID, []queue.StreamingPartData{partData}); err != nil { + // Stream segment to target shard replicas. The factory rebuilds the + // part on every retry so each attempt gets fresh offset-0 readers. + if err := mv.streamPartToTargetShard(targetShardID, func() ([]queue.StreamingPartData, error) { + return []queue.StreamingPartData{mv.createStreamingSegmentFromFiles(targetShardID, files, segmentTR, data.TopicTraceSeriesSync.String())}, nil + }); err != nil { errorMsg := fmt.Sprintf("failed to stream trace segment to target shard %d: %v", targetShardID, err) mv.recordError(scopeSeries, segmentTR, shardID, errorMsg) return fmt.Errorf("failed to stream trace segment to target shard %d: %w", targetShardID, err) @@ -323,41 +317,55 @@ func (mv *traceMigrationVisitor) VisitShard(timestampTR *timestamp.TimeRange, so } atomic.AddUint64(&mv.partsCopiedSingleTarget, 1) mv.progress.AddTraceChunkSyncShard(mv.group) - allParts := make([]queue.StreamingPartData, 0) - sidxPartData, sidxReleases, err := mv.generateAllSidxPartData(timestampTR, sourceShardID, filepath.Join(shardPath, "sidx")) - if err != nil { - return fmt.Errorf("failed to generate sidx part data: %s: %w", shardPath, err) - } - defer func() { - for _, release := range sidxReleases { - release() - } - }() - allParts = append(allParts, sidxPartData...) + targetShardID := mv.calculateTargetShardID(uint32(sourceShardID)) - partDatas, partDataReleases, err := mv.generateAllPartData(timestampTR, sourceShardID, shardPath) - if err != nil { - return fmt.Errorf("failed to generate core par data: %s: %w", shardPath, err) - } + // Regenerate the sidx + core parts each attempt for fresh offset-0 readers, + // releasing the prior attempt's handles first; the deferred call frees the last set. + var prevRelease func() defer func() { - for _, release := range partDataReleases { - release() + if prevRelease != nil { + prevRelease() } }() - allParts = append(allParts, partDatas...) - - targetShardID := mv.calculateTargetShardID(uint32(sourceShardID)) + mk := func() ([]queue.StreamingPartData, error) { + if prevRelease != nil { + prevRelease() + } + var releases []func() + // Captures releases by reference, so it frees whatever this attempt has + // opened by call time — including the partial set on an early return. + prevRelease = func() { + for _, release := range releases { + release() + } + } + allParts := make([]queue.StreamingPartData, 0) + sidxPartData, sidxReleases, genErr := mv.generateAllSidxPartData(timestampTR, sourceShardID, filepath.Join(shardPath, "sidx")) + if genErr != nil { + return nil, fmt.Errorf("failed to generate sidx part data: %s: %w", shardPath, genErr) + } + releases = append(releases, sidxReleases...) + allParts = append(allParts, sidxPartData...) - sort.Slice(allParts, func(i, j int) bool { - if allParts[i].ID == allParts[j].ID { - return allParts[i].PartType < allParts[j].PartType + partDatas, partDataReleases, genErr := mv.generateAllPartData(timestampTR, sourceShardID, shardPath) + if genErr != nil { + return nil, fmt.Errorf("failed to generate core par data: %s: %w", shardPath, genErr) } - return allParts[i].ID < allParts[j].ID - }) + releases = append(releases, partDataReleases...) + allParts = append(allParts, partDatas...) + + sort.Slice(allParts, func(i, j int) bool { + if allParts[i].ID == allParts[j].ID { + return allParts[i].PartType < allParts[j].PartType + } + return allParts[i].ID < allParts[j].ID + }) + return allParts, nil + } // Stream part to target segment - if err := mv.streamPartToTargetShard(targetShardID, allParts); err != nil { + if err := mv.streamPartToTargetShard(targetShardID, mk); err != nil { errorMsg := fmt.Errorf("failed to stream to target shard %d: %w", targetShardID, err) mv.recordError(scopeShard, timestampTR, sourceShardID, errorMsg.Error()) return fmt.Errorf("failed to stream trace shard to target segment shard: %w", err) @@ -511,6 +519,12 @@ func (mv *traceMigrationVisitor) generateAllSidxPartData( // Create StreamingPartData with PartType = index name partData, err := sidx.ParsePartMetadata(mv.lfs, partPath) if err != nil { + // Release this part's just-opened handles and all earlier parts' + // so a mid-way metadata failure does not leak file descriptors. + release() + for _, r := range releases { + r() + } return nil, nil, fmt.Errorf("failed to parse sidx part metadata: %s: %w", partPath, err) } partData.Group = mv.group @@ -564,6 +578,10 @@ func (mv *traceMigrationVisitor) generateAllPartData( partPath := filepath.Join(shardPath, name) parts, releases, err := mv.generatePartData(segmentTR, sourceShardID, partPath) if err != nil { + // Release earlier parts' handles so a mid-loop failure does not leak. + for _, r := range allReleases { + r() + } return nil, nil, fmt.Errorf("failed to generate part data for path %s: %w", partPath, err) } allParts = append(allParts, parts...) @@ -636,21 +654,24 @@ func (mv *traceMigrationVisitor) calculateTargetShardID(sourceShardID uint32) ui return calculateTargetShardID(sourceShardID, mv.targetShardNum) } -// streamPartToTargetShard sends part data to all replicas of the target shard. -func (mv *traceMigrationVisitor) streamPartToTargetShard(targetShardID uint32, partData []queue.StreamingPartData) error { +// streamPartToTargetShard sends the parts to every replica with bounded +// exponential-backoff retry (transient: target restarting, disconnect, +// receiver SERVER_BUSY). streamPartToNode closes the parts' readers after each +// send, so mk() is called per attempt to rebuild fresh offset-0 readers. +func (mv *traceMigrationVisitor) streamPartToTargetShard(targetShardID uint32, mk func() ([]queue.StreamingPartData, error)) error { copies := mv.replicas + 1 // Send to all replicas using the exact pattern from steps.go:219-236 for replicaID := uint32(0); replicaID < copies; replicaID++ { - // Use selector.Pick exactly like steps.go:220 - nodeID, err := mv.selector.Pick(mv.group, "", targetShardID, replicaID) + err := pickAndRun(mv.logger, mv.selector, mv.group, "", targetShardID, replicaID, func(nodeID string) error { + partData, mkErr := mk() + if mkErr != nil { + return mkErr + } + return mv.streamPartToNode(nodeID, targetShardID, partData) + }) if err != nil { - return fmt.Errorf("failed to pick node for shard %d replica %d: %w", targetShardID, replicaID, err) - } - - // Stream part data to target node using chunked sync - if err := mv.streamPartToNode(nodeID, targetShardID, partData); err != nil { - return fmt.Errorf("failed to stream trace part to node %s: %w", nodeID, err) + return fmt.Errorf("failed to stream trace part to shard %d replica %d: %w", targetShardID, replicaID, err) } } @@ -726,15 +747,22 @@ func (mv *traceMigrationVisitor) Close() error { // createStreamingSegmentFromFiles creates StreamingPartData from segment files. func (mv *traceMigrationVisitor) createStreamingSegmentFromFiles( targetShardID uint32, - files []queue.FileInfo, + files []fileInfo, segmentTR *timestamp.TimeRange, topic string, ) queue.StreamingPartData { + filesInfo := make([]queue.FileInfo, 0, len(files)) + for _, file := range files { + filesInfo = append(filesInfo, queue.FileInfo{ + Name: file.name, + Reader: file.file.SequentialRead(), + }) + } segmentData := queue.StreamingPartData{ Group: mv.group, ShardID: targetShardID, // Use calculated target shard Topic: topic, // Use the new topic - Files: files, + Files: filesInfo, MinTimestamp: segmentTR.Start.UnixNano(), MaxTimestamp: segmentTR.End.UnixNano(), } diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go index 76a8c1ae3..ae18e38ef 100644 --- a/banyand/measure/measure.go +++ b/banyand/measure/measure.go @@ -56,6 +56,7 @@ type option struct { seriesCacheMaxSize run.Bytes flushTimeout time.Duration syncInterval time.Duration + memWaitTimeout time.Duration failedPartsMaxTotalSizeBytes uint64 vectorized vmeasure.VectorizedConfig } diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go index 24aeeb4f1..f69c19f2b 100644 --- a/banyand/measure/svc_data.go +++ b/banyand/measure/svc_data.go @@ -170,6 +170,8 @@ func (s *dataSVC) FlagSet() *run.FlagSet { flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of measure") flagS.StringVar(&s.dataPath, "measure-data-path", "", "the data directory path of measure. If not set, <measure-root-path>/measure/data will be used") flagS.DurationVar(&s.option.flushTimeout, "measure-flush-timeout", defaultFlushTimeout, "the memory data timeout of measure") + flagS.DurationVar(&s.option.memWaitTimeout, "measure-lifecycle-receive-mem-wait-timeout", 5*time.Minute, + "max time the migration receiver waits for memory to recover before introducing an external segment") s.option.mergePolicy = newDefaultMergePolicy() flagS.VarP(&s.option.mergePolicy.maxFanOutSize, "measure-max-fan-out-size", "", "the upper bound of a single file size after merge of measure") s.option.seriesCacheMaxSize = run.Bytes(32 << 20) @@ -310,7 +312,7 @@ func (s *dataSVC) PreRun(ctx context.Context) error { } s.pipeline.RegisterChunkedSyncHandler(data.TopicMeasurePartSync, setUpChunkedSyncCallback(s.l, s.schemaRepo)) - s.pipeline.RegisterChunkedSyncHandler(data.TopicMeasureSeriesSync, setUpSyncSeriesCallback(s.l, s.schemaRepo)) + s.pipeline.RegisterChunkedSyncHandler(data.TopicMeasureSeriesSync, setUpSyncSeriesCallback(s.l, s.schemaRepo, s.pm, s.option.memWaitTimeout)) err = s.pipeline.Subscribe(data.TopicMeasureSeriesIndexInsert, setUpIndexCallback(s.l, s.schemaRepo, data.TopicMeasureSeriesIndexInsert)) if err != nil { return err diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go index d34752ed9..3e5449d8b 100644 --- a/banyand/measure/svc_liaison.go +++ b/banyand/measure/svc_liaison.go @@ -23,6 +23,7 @@ import ( "path" "path/filepath" "strings" + "time" "github.com/pkg/errors" @@ -109,6 +110,8 @@ func (s *liaison) FlagSet() *run.FlagSet { flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of measure") flagS.StringVar(&s.dataPath, "measure-data-path", "", "the data directory path of measure. If not set, <measure-root-path>/measure/data will be used") flagS.DurationVar(&s.option.flushTimeout, "measure-flush-timeout", defaultFlushTimeout, "the memory data timeout of measure") + flagS.DurationVar(&s.option.memWaitTimeout, "measure-lifecycle-receive-mem-wait-timeout", 5*time.Minute, + "max time the migration receiver waits for memory to recover before introducing an external segment") flagS.DurationVar(&s.option.syncInterval, "measure-sync-interval", defaultSyncInterval, "the periodic sync interval for measure data") flagS.IntVar(&s.maxDiskUsagePercent, "measure-max-disk-usage-percent", 95, "the maximum disk usage percentage allowed") flagS.IntVar(&s.failedPartsMaxSizePercent, "failed-parts-max-size-percent", 10, diff --git a/banyand/measure/svc_standalone.go b/banyand/measure/svc_standalone.go index 9e5eb6358..8be02a81c 100644 --- a/banyand/measure/svc_standalone.go +++ b/banyand/measure/svc_standalone.go @@ -177,6 +177,8 @@ func (s *standalone) FlagSet() *run.FlagSet { flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of measure") flagS.StringVar(&s.dataPath, "measure-data-path", "", "the data directory path of measure. If not set, <measure-root-path>/measure/data will be used") flagS.DurationVar(&s.option.flushTimeout, "measure-flush-timeout", defaultFlushTimeout, "the memory data timeout of measure") + flagS.DurationVar(&s.option.memWaitTimeout, "measure-lifecycle-receive-mem-wait-timeout", 5*time.Minute, + "max time the migration receiver waits for memory to recover before introducing an external segment") s.option.mergePolicy = newDefaultMergePolicy() flagS.VarP(&s.option.mergePolicy.maxFanOutSize, "measure-max-fan-out-size", "", "the upper bound of a single file size after merge of measure") s.option.seriesCacheMaxSize = run.Bytes(32 << 20) diff --git a/banyand/measure/write_data.go b/banyand/measure/write_data.go index c6079f365..301449a82 100644 --- a/banyand/measure/write_data.go +++ b/banyand/measure/write_data.go @@ -18,6 +18,7 @@ package measure import ( + "context" "fmt" "strings" "sync/atomic" @@ -26,6 +27,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index" @@ -158,6 +160,9 @@ func (s *syncCallback) HandleFileChunk(ctx *queue.ChunkedSyncPartContext, chunk return fmt.Errorf("part handler is nil") } partCtx := ctx.Handler.(*syncPartContext) + if partCtx.tsTable.pm.State() == protector.StateHigh { + return queue.ErrServerBusy + } // Select the appropriate writer based on the filename and write the chunk. fileName := ctx.FileName @@ -214,10 +219,13 @@ func releaseWriters(sw *writers) { var writersPool = pool.Register[*writers]("measure-writers") type syncSeriesContext struct { - streamer index.ExternalSegmentStreamer - segment storage.Segment[*tsTable, *commonv1.ResourceOpts] - l *logger.Logger - fileName string + streamer index.ExternalSegmentStreamer + segment storage.Segment[*tsTable, *commonv1.ResourceOpts] + l *logger.Logger + pm protector.Memory + introduceCtx context.Context + fileName string + memWaitTimeout time.Duration } func (s *syncSeriesContext) NewPartType(_ *queue.ChunkedSyncPartContext) error { @@ -227,6 +235,9 @@ func (s *syncSeriesContext) NewPartType(_ *queue.ChunkedSyncPartContext) error { func (s *syncSeriesContext) FinishSync() error { if s.streamer != nil { + if err := protector.WaitWhileHigh(s.introduceCtx, s.pm, s.memWaitTimeout, s.l, "measure-series-introduce"); err != nil { + return err + } if err := s.streamer.CompleteSegment(); err != nil { s.l.Error().Err(err).Msg("failed to complete external segment") return err @@ -246,14 +257,18 @@ func (s *syncSeriesContext) Close() error { } type syncSeriesCallback struct { - l *logger.Logger - schemaRepo *schemaRepo + l *logger.Logger + schemaRepo *schemaRepo + pm protector.Memory + memWaitTimeout time.Duration } -func setUpSyncSeriesCallback(l *logger.Logger, schemaRepo *schemaRepo) queue.ChunkedSyncHandler { +func setUpSyncSeriesCallback(l *logger.Logger, schemaRepo *schemaRepo, pm protector.Memory, memWaitTimeout time.Duration) queue.ChunkedSyncHandler { return &syncSeriesCallback{ - l: l, - schemaRepo: schemaRepo, + l: l, + schemaRepo: schemaRepo, + pm: pm, + memWaitTimeout: memWaitTimeout, } } @@ -278,8 +293,11 @@ func (s *syncSeriesCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext return nil, err } return &syncSeriesContext{ - l: s.l, - segment: segment, + l: s.l, + segment: segment, + pm: s.pm, + memWaitTimeout: s.memWaitTimeout, + introduceCtx: ctx.RetrieveContext(), }, nil } @@ -289,12 +307,18 @@ func (s *syncSeriesCallback) HandleFileChunk(ctx *queue.ChunkedSyncPartContext, return fmt.Errorf("part handler is nil") } seriesCtx := ctx.Handler.(*syncSeriesContext) + if seriesCtx.pm.State() == protector.StateHigh { + return queue.ErrServerBusy + } if seriesCtx.segment == nil { return fmt.Errorf("segment is nil") } if seriesCtx.fileName != ctx.FileName { if seriesCtx.streamer != nil { + if err := protector.WaitWhileHigh(ctx.RetrieveContext(), seriesCtx.pm, seriesCtx.memWaitTimeout, s.l, "measure-series-introduce"); err != nil { + return err + } if err := seriesCtx.streamer.CompleteSegment(); err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to complete external segment") return err diff --git a/banyand/measure/write_data_busy_test.go b/banyand/measure/write_data_busy_test.go new file mode 100644 index 000000000..7ffb1bdf4 --- /dev/null +++ b/banyand/measure/write_data_busy_test.go @@ -0,0 +1,67 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// highMem is a fake protector.Memory that always reports a configurable state. +type highMem struct { + protector.Nop + state protector.State +} + +// State returns the fixed scripted state. +func (h highMem) State() protector.State { + return h.state +} + +// TestSyncSeriesCallback_HandleFileChunkBusy verifies that the series chunk +// handler sheds load by returning queue.ErrServerBusy when memory is high, +// before touching any segment or streamer state. +func TestSyncSeriesCallback_HandleFileChunkBusy(t *testing.T) { + cb := &syncSeriesCallback{l: logger.GetLogger("test")} + // seriesCtx has a nil segment; the BUSY check must fire first so the + // nil-segment path is never reached. + seriesCtx := &syncSeriesContext{ + l: logger.GetLogger("test"), + pm: highMem{state: protector.StateHigh}, + } + ctx := &queue.ChunkedSyncPartContext{Handler: seriesCtx, FileName: "series.idx"} + err := cb.HandleFileChunk(ctx, []byte("payload")) + require.Error(t, err) + assert.ErrorIs(t, err, queue.ErrServerBusy) +} + +// TestSyncSeriesCallback_HandleFileChunkNilHandler verifies the early nil +// handler guard still applies. +func TestSyncSeriesCallback_HandleFileChunkNilHandler(t *testing.T) { + cb := &syncSeriesCallback{l: logger.GetLogger("test")} + ctx := &queue.ChunkedSyncPartContext{} + err := cb.HandleFileChunk(ctx, []byte("payload")) + require.Error(t, err) + assert.NotErrorIs(t, err, queue.ErrServerBusy) +} diff --git a/banyand/protector/wait.go b/banyand/protector/wait.go new file mode 100644 index 000000000..5d8357448 --- /dev/null +++ b/banyand/protector/wait.go @@ -0,0 +1,63 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package protector + +import ( + "context" + "time" + + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +const ( + waitInitialBackoff = 200 * time.Millisecond + waitMaxBackoff = 2 * time.Second + waitHeartbeat = time.Minute +) + +// WaitWhileHigh blocks while memory pressure is high, bounded by maxWait. +// It only creates a timeout context when it actually has to wait. It returns +// the timeout/cancel error if ctx or maxWait elapses before recovery. +func WaitWhileHigh(ctx context.Context, pm Memory, maxWait time.Duration, l *logger.Logger, stage string) error { + if pm.State() != StateHigh { + return nil + } + waitCtx, cancel := context.WithTimeout(ctx, maxWait) + defer cancel() + start := time.Now() + l.Warn().Str("stage", stage).Dur("max_wait", maxWait).Msg("memory high, migration receive paused, waiting") + ticker := time.NewTicker(waitHeartbeat) + defer ticker.Stop() + backoff := waitInitialBackoff + for pm.State() == StateHigh { + select { + case <-time.After(backoff): + backoff *= 2 + if backoff > waitMaxBackoff { + backoff = waitMaxBackoff + } + case <-ticker.C: + l.Warn().Str("stage", stage).Dur("waited", time.Since(start)).Msg("still waiting for memory") + case <-waitCtx.Done(): + l.Warn().Str("stage", stage).Dur("waited", time.Since(start)).Msg("memory wait aborted (timeout/cancel)") + return waitCtx.Err() + } + } + l.Info().Str("stage", stage).Dur("waited", time.Since(start)).Msg("memory recovered, migration receive resumed") + return nil +} diff --git a/banyand/protector/wait_test.go b/banyand/protector/wait_test.go new file mode 100644 index 000000000..eec29127e --- /dev/null +++ b/banyand/protector/wait_test.go @@ -0,0 +1,72 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package protector + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// fakeMem is a test double that implements the full Memory interface by +// embedding Nop and overriding State() to advance through a script of states. +type fakeMem struct { + Nop + states []State + idx atomic.Int32 +} + +// State returns the next scripted state, sticking on the last value. +func (f *fakeMem) State() State { + i := int(f.idx.Load()) + if i >= len(f.states) { + i = len(f.states) - 1 + } else { + f.idx.Add(1) + } + return f.states[i] +} + +func TestWaitWhileHigh_ReturnsWhenRecovered(t *testing.T) { + pm := &fakeMem{states: []State{StateHigh, StateHigh, StateLow}} + err := WaitWhileHigh(context.Background(), pm, time.Second, logger.GetLogger("test"), "test-stage") + assert.NoError(t, err) +} + +func TestWaitWhileHigh_TimesOut(t *testing.T) { + pm := &fakeMem{states: []State{StateHigh}} + start := time.Now() + err := WaitWhileHigh(context.Background(), pm, 50*time.Millisecond, logger.GetLogger("test"), "test-stage") + require.Error(t, err) + assert.ErrorIs(t, err, context.DeadlineExceeded) + assert.Less(t, time.Since(start), time.Second, "timeout test must complete quickly") +} + +func TestWaitWhileHigh_FastPathNoTimeoutCtx(t *testing.T) { + pm := &fakeMem{states: []State{StateLow}} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := WaitWhileHigh(ctx, pm, time.Second, logger.GetLogger("test"), "test-stage") + assert.NoError(t, err) +} diff --git a/banyand/queue/pub/chunked_sync.go b/banyand/queue/pub/chunked_sync.go index 6c30fef0b..cc77e2a68 100644 --- a/banyand/queue/pub/chunked_sync.go +++ b/banyand/queue/pub/chunked_sync.go @@ -512,6 +512,9 @@ func (c *chunkedSyncClient) sendChunk( case clusterv1.SyncStatus_SYNC_STATUS_SESSION_NOT_FOUND: return fmt.Errorf("session %s not found on server for chunk %d: %s", sessionID, *chunkIndex, resp.Error) + case clusterv1.SyncStatus_SYNC_STATUS_SERVER_BUSY: + return fmt.Errorf("receiver busy for chunk %d: %w", *chunkIndex, queue.ErrServerBusy) + default: if resp.Error != "" { return fmt.Errorf("chunk %d sync failed: %s", *chunkIndex, resp.Error) @@ -572,6 +575,9 @@ func classifyChunkedSyncPubErr(err error) string { if err == nil { return "" } + if errors.Is(err, queue.ErrServerBusy) { + return "server_busy" + } msg := err.Error() switch { case strings.Contains(msg, "checksum mismatch"): diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go index 62944bd49..fc0c03d61 100644 --- a/banyand/queue/queue.go +++ b/banyand/queue/queue.go @@ -40,6 +40,10 @@ import ( // self-probing without a special-cased branch in the caller. var ErrNotImplemented = errors.New("not implemented") +// ErrServerBusy signals the receiver is under memory pressure and the sender +// should back off and retry the whole part. +var ErrServerBusy = errors.New("receiver under memory pressure") + // Queue builds a data transmission tunnel between subscribers and publishers. // //go:generate mockgen -destination=./queue_mock.go -package=queue github.com/apache/skywalking-banyandb/pkg/bus MessageListener @@ -136,7 +140,10 @@ type ChunkedSyncClient interface { // ChunkedSyncPartContext represents the context for a chunked sync operation. type ChunkedSyncPartContext struct { - Handler PartHandler + Handler PartHandler + // Context carries the gRPC stream context for receive-side cancellation + // (memory wait upper bound / disconnect). + Context context.Context Group string FileName string PartType string @@ -152,6 +159,15 @@ type ChunkedSyncPartContext struct { ShardID uint32 } +// RetrieveContext returns the part's stream context, or context.Background() +// if none was set. Used by receive handlers to bound memory-pressure waits. +func (c *ChunkedSyncPartContext) RetrieveContext() context.Context { + if c.Context == nil { + return context.Background() + } + return c.Context +} + // Close releases resources associated with the context. func (c *ChunkedSyncPartContext) Close() error { defer func() { diff --git a/banyand/queue/queue_test.go b/banyand/queue/queue_test.go new file mode 100644 index 000000000..8030d9211 --- /dev/null +++ b/banyand/queue/queue_test.go @@ -0,0 +1,35 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package queue + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +// TestChunkedSyncPartContextRetrieveContext verifies the nil fallback and +// pass-through behavior of ChunkedSyncPartContext.RetrieveContext. +func TestChunkedSyncPartContextRetrieveContext(t *testing.T) { + assert.Equal(t, context.Background(), (&ChunkedSyncPartContext{}).RetrieveContext()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + assert.Equal(t, ctx, (&ChunkedSyncPartContext{Context: ctx}).RetrieveContext()) +} diff --git a/banyand/queue/sub/chunked_sync.go b/banyand/queue/sub/chunked_sync.go index 1ed1be1fa..dedca8e3f 100644 --- a/banyand/queue/sub/chunked_sync.go +++ b/banyand/queue/sub/chunked_sync.go @@ -418,6 +418,7 @@ func (s *server) processExpectedChunk(stream clusterv1.ChunkedSyncService_SyncPa if createNewContext { session.partCtx = &queue.ChunkedSyncPartContext{ + Context: stream.Context(), ID: partInfo.Id, Group: session.metadata.Group, ShardID: session.metadata.ShardId, @@ -452,6 +453,10 @@ func (s *server) processExpectedChunk(stream clusterv1.ChunkedSyncService_SyncPa } if processErr := s.processPart(session, req, partInfo, partIndex, handler); processErr != nil { + if errors.Is(processErr, queue.ErrServerBusy) { + return s.sendResponse(stream, req, clusterv1.SyncStatus_SYNC_STATUS_SERVER_BUSY, + "receiver under memory pressure, retry later", nil) + } s.log.Error().Err(processErr). Str("session_id", req.SessionId). Str("topic", session.metadata.Topic). diff --git a/banyand/queue/sub/chunked_sync_test.go b/banyand/queue/sub/chunked_sync_test.go index 206f93b8c..f43c970bb 100644 --- a/banyand/queue/sub/chunked_sync_test.go +++ b/banyand/queue/sub/chunked_sync_test.go @@ -282,6 +282,68 @@ func TestChunkOrderingBufferFull(t *testing.T) { assert.True(t, found, "expected error_type=buffer_full to be recorded") } +// TestChunkedSyncServerBusy verifies that when the handler's HandleFileChunk reports +// memory pressure via queue.ErrServerBusy, the server replies with SYNC_STATUS_SERVER_BUSY +// instead of a generic stream error or a CHUNK_RECEIVED success. +func TestChunkedSyncServerBusy(t *testing.T) { + err := logger.Init(logger.Logging{Env: "dev", Level: "info"}) + require.NoError(t, err) + + s := &server{ //nolint:exhaustruct + log: logger.GetLogger("test-server-busy"), + chunkedSyncHandlers: make(map[bus.Topic]queue.ChunkedSyncHandler), + } + s.chunkedSyncHandlers[data.TopicStreamPartSync] = &busyChunkedSyncHandler{} + + mockStream := &MockSyncPartStream{} + session := &syncSession{ //nolint:exhaustruct + sessionID: "busy-session", + startTime: time.Now(), + partsProgress: make(map[int]*partProgress), + metadata: &clusterv1.SyncMetadata{ + Group: "test-group", + ShardId: 1, + Topic: data.TopicStreamPartSync.String(), + }, + } + + chunkData := []byte("busy-chunk-data") + req := &clusterv1.SyncPartRequest{ + SessionId: "busy-session", + ChunkIndex: 0, + ChunkData: chunkData, + ChunkChecksum: fmt.Sprintf("%x", crc32.ChecksumIEEE(chunkData)), + PartsInfo: []*clusterv1.PartInfo{ + { + Id: 1, + Files: []*clusterv1.FileInfo{ + {Name: "test-file.dat", Offset: 0, Size: uint32(len(chunkData))}, + }, + }, + }, + } + + require.NoError(t, s.processChunk(mockStream, session, req)) + + require.Len(t, mockStream.sentResponses, 1) + resp := mockStream.sentResponses[0] + assert.Equal(t, clusterv1.SyncStatus_SYNC_STATUS_SERVER_BUSY, resp.Status) + assert.NotEmpty(t, resp.Error) + assert.Equal(t, "busy-session", resp.SessionId) + assert.Equal(t, uint32(0), resp.ChunkIndex) +} + +// busyChunkedSyncHandler is a handler whose HandleFileChunk always reports memory pressure. +type busyChunkedSyncHandler struct{} + +func (m *busyChunkedSyncHandler) CreatePartHandler(_ *queue.ChunkedSyncPartContext) (queue.PartHandler, error) { + return &MockChunkedSyncPartHandler{}, nil +} + +func (m *busyChunkedSyncHandler) HandleFileChunk(_ *queue.ChunkedSyncPartContext, _ []byte) error { + return queue.ErrServerBusy +} + // MockChunkedSyncHandler implements queue.ChunkedSyncHandler for testing. type MockChunkedSyncHandler struct{} diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index a5b3873ac..7d47871f1 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -54,6 +54,7 @@ type option struct { flushTimeout time.Duration elementIndexFlushTimeout time.Duration syncInterval time.Duration + memWaitTimeout time.Duration failedPartsMaxTotalSizeBytes uint64 } diff --git a/banyand/stream/svc_liaison.go b/banyand/stream/svc_liaison.go index 78da92c9c..176b7f6ba 100644 --- a/banyand/stream/svc_liaison.go +++ b/banyand/stream/svc_liaison.go @@ -23,6 +23,7 @@ import ( "path" "path/filepath" "strings" + "time" "github.com/pkg/errors" @@ -116,6 +117,8 @@ func (s *liaison) FlagSet() *run.FlagSet { flagS.StringVar(&s.root, "stream-root-path", "/tmp", "the root path of stream") flagS.StringVar(&s.dataPath, "stream-data-path", "", "the data directory path of stream. If not set, <stream-root-path>/stream/data will be used") flagS.DurationVar(&s.option.flushTimeout, "stream-flush-timeout", defaultFlushTimeout, "the memory data timeout of stream") + flagS.DurationVar(&s.option.memWaitTimeout, "stream-lifecycle-receive-mem-wait-timeout", 5*time.Minute, + "max time the migration receiver waits for memory to recover before introducing an external segment") flagS.IntVar(&s.maxDiskUsagePercent, "stream-max-disk-usage-percent", 95, "the maximum disk usage percentage allowed") flagS.DurationVar(&s.option.syncInterval, "stream-sync-interval", defaultSyncInterval, "the periodic sync interval for stream data") flagS.IntVar(&s.failedPartsMaxSizePercent, "failed-parts-max-size-percent", 10, diff --git a/banyand/stream/svc_standalone.go b/banyand/stream/svc_standalone.go index 9dab8a867..86172aa1d 100644 --- a/banyand/stream/svc_standalone.go +++ b/banyand/stream/svc_standalone.go @@ -177,6 +177,8 @@ func (s *standalone) FlagSet() *run.FlagSet { flagS.StringVar(&s.dataPath, "stream-data-path", "", "the data directory path of stream. If not set, <stream-root-path>/stream/data will be used") flagS.DurationVar(&s.option.flushTimeout, "stream-flush-timeout", defaultFlushTimeout, "the memory data timeout of stream") flagS.DurationVar(&s.option.elementIndexFlushTimeout, "element-index-flush-timeout", defaultFlushTimeout, "the elementIndex timeout of stream") + flagS.DurationVar(&s.option.memWaitTimeout, "stream-lifecycle-receive-mem-wait-timeout", 5*time.Minute, + "max time the migration receiver waits for memory to recover before introducing an external segment") s.option.mergePolicy = newDefaultMergePolicy() flagS.VarP(&s.option.mergePolicy.maxFanOutSize, "stream-max-fan-out-size", "", "the upper bound of a single file size after merge of stream") s.option.seriesCacheMaxSize = run.Bytes(32 << 20) @@ -293,9 +295,9 @@ func (s *standalone) PreRun(ctx context.Context) error { } s.pipeline.RegisterChunkedSyncHandler(data.TopicStreamPartSync, setUpChunkedSyncCallback(s.l, &s.schemaRepo)) // Register chunked sync handler for stream series index - s.pipeline.RegisterChunkedSyncHandler(data.TopicStreamSeriesSync, setUpSyncSeriesCallback(s.l, &s.schemaRepo)) + s.pipeline.RegisterChunkedSyncHandler(data.TopicStreamSeriesSync, setUpSyncSeriesCallback(s.l, &s.schemaRepo, s.pm, s.option.memWaitTimeout)) // Register chunked sync handler for stream element index - s.pipeline.RegisterChunkedSyncHandler(data.TopicStreamElementIndexSync, setUpSyncElementIndexCallback(s.l, &s.schemaRepo)) + s.pipeline.RegisterChunkedSyncHandler(data.TopicStreamElementIndexSync, setUpSyncElementIndexCallback(s.l, &s.schemaRepo, s.pm, s.option.memWaitTimeout)) err = s.pipeline.Subscribe(data.TopicStreamSeriesIndexWrite, setUpSeriesIndexCallback(s.l, &s.schemaRepo)) if err != nil { diff --git a/banyand/stream/write_data.go b/banyand/stream/write_data.go index 8506a36e7..af360c9c4 100644 --- a/banyand/stream/write_data.go +++ b/banyand/stream/write_data.go @@ -18,6 +18,7 @@ package stream import ( + "context" "fmt" "strings" "sync/atomic" @@ -26,6 +27,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index" @@ -157,6 +159,9 @@ func (s *syncCallback) HandleFileChunk(ctx *queue.ChunkedSyncPartContext, chunk return fmt.Errorf("part handler is nil") } partCtx := ctx.Handler.(*syncPartContext) + if partCtx.tsTable.pm.State() == protector.StateHigh { + return queue.ErrServerBusy + } // Select the appropriate writer based on the filename and write the chunk. fileName := ctx.FileName @@ -188,10 +193,13 @@ func (s *syncCallback) HandleFileChunk(ctx *queue.ChunkedSyncPartContext, chunk } type syncSeriesContext struct { - streamer index.ExternalSegmentStreamer - segment storage.Segment[*tsTable, *commonv1.ResourceOpts] - l *logger.Logger - fileName string + streamer index.ExternalSegmentStreamer + segment storage.Segment[*tsTable, *commonv1.ResourceOpts] + l *logger.Logger + pm protector.Memory + introduceCtx context.Context + fileName string + memWaitTimeout time.Duration } func (s *syncSeriesContext) NewPartType(_ *queue.ChunkedSyncPartContext) error { @@ -201,6 +209,9 @@ func (s *syncSeriesContext) NewPartType(_ *queue.ChunkedSyncPartContext) error { func (s *syncSeriesContext) FinishSync() error { if s.streamer != nil { + if err := protector.WaitWhileHigh(s.introduceCtx, s.pm, s.memWaitTimeout, s.l, "stream-series-introduce"); err != nil { + return err + } if err := s.streamer.CompleteSegment(); err != nil { s.l.Error().Err(err).Msg("failed to complete external segment") return err @@ -220,14 +231,18 @@ func (s *syncSeriesContext) Close() error { } type syncSeriesCallback struct { - l *logger.Logger - schemaRepo *schemaRepo + l *logger.Logger + schemaRepo *schemaRepo + pm protector.Memory + memWaitTimeout time.Duration } -func setUpSyncSeriesCallback(l *logger.Logger, schemaRepo *schemaRepo) queue.ChunkedSyncHandler { +func setUpSyncSeriesCallback(l *logger.Logger, schemaRepo *schemaRepo, pm protector.Memory, memWaitTimeout time.Duration) queue.ChunkedSyncHandler { return &syncSeriesCallback{ - l: l, - schemaRepo: schemaRepo, + l: l, + schemaRepo: schemaRepo, + pm: pm, + memWaitTimeout: memWaitTimeout, } } @@ -252,8 +267,11 @@ func (s *syncSeriesCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext return nil, err } return &syncSeriesContext{ - l: s.l, - segment: segment, + l: s.l, + segment: segment, + pm: s.pm, + memWaitTimeout: s.memWaitTimeout, + introduceCtx: ctx.RetrieveContext(), }, nil } @@ -263,12 +281,18 @@ func (s *syncSeriesCallback) HandleFileChunk(ctx *queue.ChunkedSyncPartContext, return fmt.Errorf("part handler is nil") } seriesCtx := ctx.Handler.(*syncSeriesContext) + if seriesCtx.pm.State() == protector.StateHigh { + return queue.ErrServerBusy + } if seriesCtx.segment == nil { return fmt.Errorf("segment is nil") } if seriesCtx.fileName != ctx.FileName { if seriesCtx.streamer != nil { + if err := protector.WaitWhileHigh(ctx.RetrieveContext(), seriesCtx.pm, seriesCtx.memWaitTimeout, s.l, "stream-series-introduce"); err != nil { + return err + } if err := seriesCtx.streamer.CompleteSegment(); err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to complete external segment") return err @@ -294,11 +318,14 @@ func (s *syncSeriesCallback) HandleFileChunk(ctx *queue.ChunkedSyncPartContext, } type syncElementIndexContext struct { - streamer index.ExternalSegmentStreamer - segment storage.Segment[*tsTable, *commonv1.ResourceOpts] - tsTable *tsTable - l *logger.Logger - fileName string + streamer index.ExternalSegmentStreamer + segment storage.Segment[*tsTable, *commonv1.ResourceOpts] + tsTable *tsTable + l *logger.Logger + pm protector.Memory + introduceCtx context.Context + fileName string + memWaitTimeout time.Duration } func (s *syncElementIndexContext) NewPartType(_ *queue.ChunkedSyncPartContext) error { @@ -308,6 +335,9 @@ func (s *syncElementIndexContext) NewPartType(_ *queue.ChunkedSyncPartContext) e func (s *syncElementIndexContext) FinishSync() error { if s.streamer != nil { + if err := protector.WaitWhileHigh(s.introduceCtx, s.pm, s.memWaitTimeout, s.l, "stream-element-index-introduce"); err != nil { + return err + } if err := s.streamer.CompleteSegment(); err != nil { s.l.Error().Err(err).Msg("failed to complete external segment for element index") return err @@ -328,14 +358,18 @@ func (s *syncElementIndexContext) Close() error { } type syncElementIndexCallback struct { - l *logger.Logger - schemaRepo *schemaRepo + l *logger.Logger + schemaRepo *schemaRepo + pm protector.Memory + memWaitTimeout time.Duration } -func setUpSyncElementIndexCallback(l *logger.Logger, schemaRepo *schemaRepo) queue.ChunkedSyncHandler { +func setUpSyncElementIndexCallback(l *logger.Logger, schemaRepo *schemaRepo, pm protector.Memory, memWaitTimeout time.Duration) queue.ChunkedSyncHandler { return &syncElementIndexCallback{ - l: l, - schemaRepo: schemaRepo, + l: l, + schemaRepo: schemaRepo, + pm: pm, + memWaitTimeout: memWaitTimeout, } } @@ -366,9 +400,12 @@ func (s *syncElementIndexCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartC } return &syncElementIndexContext{ - l: s.l, - tsTable: tsTable, - segment: segment, + l: s.l, + tsTable: tsTable, + segment: segment, + pm: s.pm, + memWaitTimeout: s.memWaitTimeout, + introduceCtx: ctx.RetrieveContext(), }, nil } @@ -378,12 +415,18 @@ func (s *syncElementIndexCallback) HandleFileChunk(ctx *queue.ChunkedSyncPartCon return fmt.Errorf("part handler is nil") } elementCtx := ctx.Handler.(*syncElementIndexContext) + if elementCtx.pm.State() == protector.StateHigh { + return queue.ErrServerBusy + } if elementCtx.tsTable == nil { return fmt.Errorf("ts table is nil") } if elementCtx.fileName != ctx.FileName { if elementCtx.streamer != nil { + if err := protector.WaitWhileHigh(ctx.RetrieveContext(), elementCtx.pm, elementCtx.memWaitTimeout, s.l, "stream-element-index-introduce"); err != nil { + return err + } if err := elementCtx.streamer.CompleteSegment(); err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to complete external segment for element index") return err diff --git a/banyand/stream/write_data_busy_test.go b/banyand/stream/write_data_busy_test.go new file mode 100644 index 000000000..de05c7068 --- /dev/null +++ b/banyand/stream/write_data_busy_test.go @@ -0,0 +1,94 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// highMem is a fake protector.Memory that always reports a configurable state. +type highMem struct { + protector.Nop + state protector.State +} + +// State returns the fixed scripted state. +func (h highMem) State() protector.State { + return h.state +} + +// TestSyncSeriesCallback_HandleFileChunkBusy verifies that the series chunk +// handler sheds load by returning queue.ErrServerBusy when memory is high, +// before touching any segment or streamer state. +func TestSyncSeriesCallback_HandleFileChunkBusy(t *testing.T) { + cb := &syncSeriesCallback{l: logger.GetLogger("test")} + // seriesCtx has a nil segment; the BUSY check must fire first so the + // nil-segment path is never reached. + seriesCtx := &syncSeriesContext{ + l: logger.GetLogger("test"), + pm: highMem{state: protector.StateHigh}, + } + ctx := &queue.ChunkedSyncPartContext{Handler: seriesCtx, FileName: "series.idx"} + err := cb.HandleFileChunk(ctx, []byte("payload")) + require.Error(t, err) + assert.ErrorIs(t, err, queue.ErrServerBusy) +} + +// TestSyncSeriesCallback_HandleFileChunkNilHandler verifies the early nil +// handler guard still applies. +func TestSyncSeriesCallback_HandleFileChunkNilHandler(t *testing.T) { + cb := &syncSeriesCallback{l: logger.GetLogger("test")} + ctx := &queue.ChunkedSyncPartContext{} + err := cb.HandleFileChunk(ctx, []byte("payload")) + require.Error(t, err) + assert.NotErrorIs(t, err, queue.ErrServerBusy) +} + +// TestSyncElementIndexCallback_HandleFileChunkBusy verifies that the element +// index chunk handler sheds load by returning queue.ErrServerBusy when memory +// is high, before touching any ts table or streamer state. +func TestSyncElementIndexCallback_HandleFileChunkBusy(t *testing.T) { + cb := &syncElementIndexCallback{l: logger.GetLogger("test")} + // elementCtx has a nil tsTable; the BUSY check must fire first so the + // nil-tsTable path is never reached. + elementCtx := &syncElementIndexContext{ + l: logger.GetLogger("test"), + pm: highMem{state: protector.StateHigh}, + } + ctx := &queue.ChunkedSyncPartContext{Handler: elementCtx, FileName: "element.idx"} + err := cb.HandleFileChunk(ctx, []byte("payload")) + require.Error(t, err) + assert.ErrorIs(t, err, queue.ErrServerBusy) +} + +// TestSyncElementIndexCallback_HandleFileChunkNilHandler verifies the early nil +// handler guard still applies. +func TestSyncElementIndexCallback_HandleFileChunkNilHandler(t *testing.T) { + cb := &syncElementIndexCallback{l: logger.GetLogger("test")} + ctx := &queue.ChunkedSyncPartContext{} + err := cb.HandleFileChunk(ctx, []byte("payload")) + require.Error(t, err) + assert.NotErrorIs(t, err, queue.ErrServerBusy) +} diff --git a/banyand/trace/svc_liaison.go b/banyand/trace/svc_liaison.go index 92a2418f1..053c9c657 100644 --- a/banyand/trace/svc_liaison.go +++ b/banyand/trace/svc_liaison.go @@ -129,6 +129,8 @@ func (l *liaison) FlagSet() *run.FlagSet { fs.StringVar(&l.root, "trace-root-path", "/tmp", "the root path for trace data") fs.StringVar(&l.dataPath, "trace-data-path", "", "the path for trace data (optional)") fs.DurationVar(&l.option.flushTimeout, "trace-flush-timeout", 3*time.Second, "the timeout for trace data flush") + fs.DurationVar(&l.option.memWaitTimeout, "trace-lifecycle-receive-mem-wait-timeout", 5*time.Minute, + "max time the migration receiver waits for memory to recover before introducing an external segment") fs.IntVar(&l.maxDiskUsagePercent, "trace-max-disk-usage-percent", 95, "the maximum disk usage percentage") fs.DurationVar(&l.option.syncInterval, "trace-sync-interval", defaultSyncInterval, "the periodic sync interval for trace data") fs.StringSliceVar(&l.dataNodeList, "data-node-list", nil, "comma-separated list of data node names to monitor for handoff") diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go index 4f0f657a6..a2df01723 100644 --- a/banyand/trace/svc_standalone.go +++ b/banyand/trace/svc_standalone.go @@ -83,6 +83,8 @@ func (s *standalone) FlagSet() *run.FlagSet { fs.StringVar(&s.root, "trace-root-path", "/tmp", "the root path for trace data") fs.StringVar(&s.dataPath, "trace-data-path", "", "the path for trace data (optional)") fs.DurationVar(&s.option.flushTimeout, "trace-flush-timeout", defaultFlushTimeout, "the timeout for trace data flush") + fs.DurationVar(&s.option.memWaitTimeout, "trace-lifecycle-receive-mem-wait-timeout", 5*time.Minute, + "max time the migration receiver waits for memory to recover before introducing an external segment") // Retention configuration flags fs.Float64Var(&s.retentionConfig.HighWatermark, "trace-retention-high-watermark", 95.0, "disk usage high watermark percentage that triggers forced retention cleanup") @@ -200,7 +202,7 @@ func (s *standalone) PreRun(ctx context.Context) error { return err } s.pipeline.RegisterChunkedSyncHandler(data.TopicTracePartSync, setUpChunkedSyncCallback(s.l, &s.schemaRepo)) - s.pipeline.RegisterChunkedSyncHandler(data.TopicTraceSeriesSync, setUpSeriesSyncCallback(s.l, &s.schemaRepo)) + s.pipeline.RegisterChunkedSyncHandler(data.TopicTraceSeriesSync, setUpSeriesSyncCallback(s.l, &s.schemaRepo, s.pm, s.option.memWaitTimeout)) err = s.pipeline.Subscribe(data.TopicTraceSidxSeriesWrite, setUpSidxSeriesIndexCallback(s.l, &s.schemaRepo)) if err != nil { return err diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go index 884972740..d27a324e4 100644 --- a/banyand/trace/trace.go +++ b/banyand/trace/trace.go @@ -58,6 +58,7 @@ type option struct { seriesCacheMaxSize run.Bytes flushTimeout time.Duration syncInterval time.Duration + memWaitTimeout time.Duration failedPartsMaxTotalSizeBytes uint64 vectorized vtrace.VectorizedConfig } diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go index 7a57ba4ce..e28155235 100644 --- a/banyand/trace/write_data.go +++ b/banyand/trace/write_data.go @@ -18,6 +18,7 @@ package trace import ( + "context" "fmt" "path/filepath" "strings" @@ -28,6 +29,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/banyand/internal/sidx" "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index" @@ -158,10 +160,13 @@ func (s *syncPartContext) Close() error { } type syncSeriesContext struct { - streamer index.ExternalSegmentStreamer - segment storage.Segment[*tsTable, *commonv1.ResourceOpts] - l *logger.Logger - fileName string + streamer index.ExternalSegmentStreamer + segment storage.Segment[*tsTable, *commonv1.ResourceOpts] + l *logger.Logger + pm protector.Memory + introduceCtx context.Context + fileName string + memWaitTimeout time.Duration } func (s *syncSeriesContext) NewPartType(_ *queue.ChunkedSyncPartContext) error { @@ -171,6 +176,9 @@ func (s *syncSeriesContext) NewPartType(_ *queue.ChunkedSyncPartContext) error { func (s *syncSeriesContext) FinishSync() error { if s.streamer != nil { + if err := protector.WaitWhileHigh(s.introduceCtx, s.pm, s.memWaitTimeout, s.l, "trace-series-introduce"); err != nil { + return err + } if err := s.streamer.CompleteSegment(); err != nil { s.l.Error().Err(err).Msg("failed to complete external segment") return err @@ -190,14 +198,18 @@ func (s *syncSeriesContext) Close() error { } type syncSeriesCallback struct { - l *logger.Logger - schemaRepo *schemaRepo + l *logger.Logger + schemaRepo *schemaRepo + pm protector.Memory + memWaitTimeout time.Duration } -func setUpSeriesSyncCallback(l *logger.Logger, s *schemaRepo) queue.ChunkedSyncHandler { +func setUpSeriesSyncCallback(l *logger.Logger, s *schemaRepo, pm protector.Memory, memWaitTimeout time.Duration) queue.ChunkedSyncHandler { return &syncSeriesCallback{ - l: l, - schemaRepo: s, + l: l, + schemaRepo: s, + pm: pm, + memWaitTimeout: memWaitTimeout, } } @@ -221,8 +233,11 @@ func (s *syncSeriesCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext return nil, err } return &syncSeriesContext{ - l: s.l, - segment: segment, + l: s.l, + segment: segment, + pm: s.pm, + memWaitTimeout: s.memWaitTimeout, + introduceCtx: ctx.RetrieveContext(), }, nil } @@ -232,12 +247,18 @@ func (s *syncSeriesCallback) HandleFileChunk(ctx *queue.ChunkedSyncPartContext, return fmt.Errorf("part handler is nil") } seriesCtx := ctx.Handler.(*syncSeriesContext) + if seriesCtx.pm.State() == protector.StateHigh { + return queue.ErrServerBusy + } if seriesCtx.segment == nil { return fmt.Errorf("segment is nil") } if seriesCtx.fileName != ctx.FileName { if seriesCtx.streamer != nil { + if err := protector.WaitWhileHigh(ctx.RetrieveContext(), seriesCtx.pm, seriesCtx.memWaitTimeout, s.l, "trace-series-introduce"); err != nil { + return err + } if err := seriesCtx.streamer.CompleteSegment(); err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to complete external segment") return err @@ -322,6 +343,10 @@ func (s *syncChunkCallback) HandleFileChunk(ctx *queue.ChunkedSyncPartContext, c if ctx.Handler == nil { return fmt.Errorf("part handler is nil") } + partCtx := ctx.Handler.(*syncPartContext) + if partCtx.tsTable.pm.State() == protector.StateHigh { + return queue.ErrServerBusy + } if ctx.PartType != PartTypeCore { return s.handleSidxFileChunk(ctx, chunk) } diff --git a/banyand/trace/write_data_busy_test.go b/banyand/trace/write_data_busy_test.go new file mode 100644 index 000000000..837db3a3b --- /dev/null +++ b/banyand/trace/write_data_busy_test.go @@ -0,0 +1,67 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package trace + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/banyand/protector" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// highMem is a fake protector.Memory that always reports a configurable state. +type highMem struct { + protector.Nop + state protector.State +} + +// State returns the fixed scripted state. +func (h highMem) State() protector.State { + return h.state +} + +// TestSyncSeriesCallback_HandleFileChunkBusy verifies that the series chunk +// handler sheds load by returning queue.ErrServerBusy when memory is high, +// before touching any segment or streamer state. +func TestSyncSeriesCallback_HandleFileChunkBusy(t *testing.T) { + cb := &syncSeriesCallback{l: logger.GetLogger("test")} + // seriesCtx has a nil segment; the BUSY check must fire first so the + // nil-segment path is never reached. + seriesCtx := &syncSeriesContext{ + l: logger.GetLogger("test"), + pm: highMem{state: protector.StateHigh}, + } + ctx := &queue.ChunkedSyncPartContext{Handler: seriesCtx, FileName: "series.idx"} + err := cb.HandleFileChunk(ctx, []byte("payload")) + require.Error(t, err) + assert.ErrorIs(t, err, queue.ErrServerBusy) +} + +// TestSyncSeriesCallback_HandleFileChunkNilHandler verifies the early nil +// handler guard still applies. +func TestSyncSeriesCallback_HandleFileChunkNilHandler(t *testing.T) { + cb := &syncSeriesCallback{l: logger.GetLogger("test")} + ctx := &queue.ChunkedSyncPartContext{} + err := cb.HandleFileChunk(ctx, []byte("payload")) + require.Error(t, err) + assert.NotErrorIs(t, err, queue.ErrServerBusy) +} diff --git a/docs/api-reference.md b/docs/api-reference.md index 4026f3260..e9a706dfa 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -2693,6 +2693,7 @@ SyncStatus represents the status of a sync operation. | SYNC_STATUS_SYNC_COMPLETE | 5 | Entire sync operation completed successfully. | | SYNC_STATUS_VERSION_UNSUPPORTED | 6 | Version not supported for sync operations. | | SYNC_STATUS_FORMAT_VERSION_MISMATCH | 7 | File format version incompatible. | +| SYNC_STATUS_SERVER_BUSY | 8 | Receiver under memory pressure; sender should back off and retry the whole part. | diff --git a/docs/operation/configuration.md b/docs/operation/configuration.md index 6b6a39890..599b34c07 100644 --- a/docs/operation/configuration.md +++ b/docs/operation/configuration.md @@ -121,6 +121,7 @@ The following flags are used to configure the measure storage engine: - `--measure-root-path string`: The root path of the measure database (default: "/tmp"). - `--measure-data-path string`: The data directory path of measure. If not set, `<measure-root-path>/measure/data` is used. - `--measure-max-fan-out-size bytes`: the upper bound of a single file size after merge of measure (default 8.00EiB) +- `--measure-lifecycle-receive-mem-wait-timeout duration`: Max time a lifecycle-migration receiver waits for memory to recover before introducing an external segment (default: 5m). The following flags are used to configure the stream storage engine: @@ -129,12 +130,14 @@ The following flags are used to configure the stream storage engine: - `--stream-data-path string`: The data directory path of stream. If not set, `<stream-root-path>/stream/data` is used. - `--stream-max-fan-out-size bytes`: the upper bound of a single file size after merge of stream (default 8.00EiB) - `--element-index-flush-timeout duration`: The element index timeout of stream (default: 1s). +- `--stream-lifecycle-receive-mem-wait-timeout duration`: Max time a lifecycle-migration receiver waits for memory to recover before introducing an external segment (default: 5m). The following flags are used to configure the trace storage engine: - `--trace-flush-timeout duration`: The memory data timeout of trace (default: 1s). - `--trace-root-path string`: The root path of the database (default: "/tmp"). - `--trace-max-fan-out-size bytes`: the upper bound of a single file size after merge of trace (default 8.00EiB) +- `--trace-lifecycle-receive-mem-wait-timeout duration`: Max time a lifecycle-migration receiver waits for memory to recover before introducing an external segment (default: 5m). The following flags configure the remaining per-catalog storage roots: diff --git a/docs/operation/lifecycle.md b/docs/operation/lifecycle.md index a04153a27..7a1d66566 100644 --- a/docs/operation/lifecycle.md +++ b/docs/operation/lifecycle.md @@ -135,6 +135,7 @@ lifecycle \ | `--schedule` | Schedule for periodic backup (e.g., @yearly, @monthly, @weekly, @daily, etc.) | `""` | | `--migration-orphan-policy` | What to do with rows whose measure/stream schema was deleted from the registry: `archive` or `discard` | `archive` | | `--migration-orphan-archive-subdir` | Relative subdirectory, under each catalog's root path, for archived orphan rows when policy is `archive` | `archive` | +| `--lifecycle-send-retry-timeout` | Maximum total time to retry streaming a migration part to a target node on transient failures (target node restarting, disconnect, receiver busy) | `15m` | ## Handling Orphan (Deleted-Schema) Data diff --git a/go.mod b/go.mod index e4fe08b6a..70af9b785 100644 --- a/go.mod +++ b/go.mod @@ -208,7 +208,7 @@ require ( replace ( github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 - github.com/blugelabs/bluge => github.com/SkyAPM/bluge v0.0.0-20250804100126-cccf29a55f01 + github.com/blugelabs/bluge => github.com/SkyAPM/bluge v0.0.0-20260625022800-42385daf66b8 github.com/blugelabs/bluge_segment_api => github.com/zinclabs/bluge_segment_api v1.0.0 github.com/blugelabs/ice => github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3 ) diff --git a/go.sum b/go.sum index 4007af83a..f063e6d05 100644 --- a/go.sum +++ b/go.sum @@ -55,8 +55,8 @@ github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8 github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ= github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= -github.com/SkyAPM/bluge v0.0.0-20250804100126-cccf29a55f01 h1:f56Ybipl9Yh9wFCZc3EoB5OmbW2UJfIpe6e/gsE/DuA= -github.com/SkyAPM/bluge v0.0.0-20250804100126-cccf29a55f01/go.mod h1:VfQRtJQEwpooobYuSBadb8QYjTUICH+CX42IJYrfSiM= +github.com/SkyAPM/bluge v0.0.0-20260625022800-42385daf66b8 h1:kNqj2T61hvMFOblOmkADOf4UWpJPh55jQ4WX2U31zaw= +github.com/SkyAPM/bluge v0.0.0-20260625022800-42385daf66b8/go.mod h1:VfQRtJQEwpooobYuSBadb8QYjTUICH+CX42IJYrfSiM= github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3 h1:bZvLPihpC1q1AJkmR6ere/aJOr3ev+JCytvjKuL+gE8= github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3/go.mod h1:DoQeb0Ee86LyruZSL77Ddscfk/THJ38x453CRCnGEPI= github.com/SkyAPM/ktm-ebpf v0.0.0-20260228024820-81a19d950bff h1:pjcdkEAe+NL2bA0J/mu/Vs+eG+w+KN/BX9yGkOC1f60=
