This is an automated email from the ASF dual-hosted git repository.

mrproliu pushed a commit to branch lifecycle-oom
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 2474a0a7b90e5b7f576bea05806dd02915558050
Author: mrproliu <[email protected]>
AuthorDate: Fri Jun 26 12:58:14 2026 +0800

    feat(lifecycle): throttle migration under receiver memory pressure; fix 
bluge dedup OOM
---
 CHANGES.md                                         |   2 +
 api/proto/banyandb/cluster/v1/rpc.proto            |   1 +
 .../backup/lifecycle/measure_migration_visitor.go  |  63 ++++++----
 banyand/backup/lifecycle/retry.go                  | 133 +++++++++++++++++++++
 banyand/backup/lifecycle/retry_test.go             | 118 ++++++++++++++++++
 banyand/backup/lifecycle/row_replay_measure.go     |   4 +-
 banyand/backup/lifecycle/row_replay_pipeline.go    |   4 +-
 banyand/backup/lifecycle/row_replay_stream.go      |   2 +-
 banyand/backup/lifecycle/row_replay_trace.go       |   2 +-
 banyand/backup/lifecycle/service.go                |   2 +
 .../backup/lifecycle/stream_migration_visitor.go   | 101 +++++++++-------
 .../backup/lifecycle/trace_migration_visitor.go    | 126 +++++++++++--------
 banyand/measure/measure.go                         |   1 +
 banyand/measure/svc_data.go                        |   4 +-
 banyand/measure/svc_liaison.go                     |   3 +
 banyand/measure/svc_standalone.go                  |   2 +
 banyand/measure/write_data.go                      |  46 +++++--
 banyand/measure/write_data_busy_test.go            |  67 +++++++++++
 banyand/protector/wait.go                          |  63 ++++++++++
 banyand/protector/wait_test.go                     |  72 +++++++++++
 banyand/queue/pub/chunked_sync.go                  |   6 +
 banyand/queue/queue.go                             |  18 ++-
 banyand/queue/queue_test.go                        |  35 ++++++
 banyand/queue/sub/chunked_sync.go                  |   5 +
 banyand/queue/sub/chunked_sync_test.go             |  62 ++++++++++
 banyand/stream/stream.go                           |   1 +
 banyand/stream/svc_liaison.go                      |   3 +
 banyand/stream/svc_standalone.go                   |   6 +-
 banyand/stream/write_data.go                       |  91 ++++++++++----
 banyand/stream/write_data_busy_test.go             |  94 +++++++++++++++
 banyand/trace/svc_liaison.go                       |   2 +
 banyand/trace/svc_standalone.go                    |   4 +-
 banyand/trace/trace.go                             |   1 +
 banyand/trace/write_data.go                        |  47 ++++++--
 banyand/trace/write_data_busy_test.go              |  67 +++++++++++
 docs/api-reference.md                              |   1 +
 docs/operation/configuration.md                    |   3 +
 docs/operation/lifecycle.md                        |   1 +
 go.mod                                             |   2 +-
 go.sum                                             |   4 +-
 40 files changed, 1092 insertions(+), 177 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a254f3fc2..34e7672b4 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,8 @@ Release Notes.
 - Enhance segment lifecycle: `refCount` now counts only active users, 
decoupled from "open" (`index != nil`), adding a "dormant" state (open, 
`refCount == 0`). A `DecRef` to zero no longer closes a segment; idle reclaim 
and retention delete act only at `refCount == 0`, so an in-flight 
snapshot/inspect is no longer torn down mid-operation (fixing the cold-node 
nil-index panic and bluge lock churn) while idle segments still release their 
bluge writers.
 - Speed up GCS backup uploads: write each object and its checksum metadata in 
one request, dropping the per-object `Update` round-trip.
 - Lifecycle migration now archives rows whose measure/stream schema was 
deleted from the registry, instead of aborting the group.
+- Throttle lifecycle migration under receiver memory pressure so it no longer 
OOM-kills the destination data node, via three always-on mechanisms: per-chunk 
`SYNC_STATUS_SERVER_BUSY` load-shed (the sender backs off and retries); a 
bounded memory wait before introducing an external series-index segment 
(`{measure,stream,trace}-lifecycle-receive-mem-wait-timeout`, default 5m); and 
sender-side bounded-backoff retry of transient send failures — target node 
restarting/`no nodes available`, di [...]
+- Cut the lifecycle external series-index deduplicator's memory from ~1.8 GB 
to near-zero per segment by bumping bluge to a memory-efficient implementation: 
duplicates are now resolved via `_id` term-dictionary point lookups 
(`Dictionary.Contains`) instead of decompressing and retaining every segment's 
stored fields, removing the dominant heap source during external-segment 
introduction. No public API change and identical dedup semantics (verified by a 
differential test); this is the roo [...]
 
 ### Bug Fixes
 
diff --git a/api/proto/banyandb/cluster/v1/rpc.proto 
b/api/proto/banyandb/cluster/v1/rpc.proto
index aec1b9963..a763acb8d 100644
--- a/api/proto/banyandb/cluster/v1/rpc.proto
+++ b/api/proto/banyandb/cluster/v1/rpc.proto
@@ -182,6 +182,7 @@ enum SyncStatus {
   SYNC_STATUS_SYNC_COMPLETE = 5; // Entire sync operation completed 
successfully.
   SYNC_STATUS_VERSION_UNSUPPORTED = 6; // Version not supported for sync 
operations.
   SYNC_STATUS_FORMAT_VERSION_MISMATCH = 7; // File format version incompatible.
+  SYNC_STATUS_SERVER_BUSY = 8; // Receiver under memory pressure; sender 
should back off and retry the whole part.
 }
 
 service Service {
diff --git a/banyand/backup/lifecycle/measure_migration_visitor.go 
b/banyand/backup/lifecycle/measure_migration_visitor.go
index 2059922b9..1c6a2f20d 100644
--- a/banyand/backup/lifecycle/measure_migration_visitor.go
+++ b/banyand/backup/lifecycle/measure_migration_visitor.go
@@ -284,10 +284,12 @@ func (mv *measureMigrationVisitor) VisitSeries(segmentTR 
*timestamp.TimeRange, s
                segmentIDStr := getSegmentTimeRange(targetSegmentTime, 
mv.targetStageInterval).String()
                for _, shardID := range shardIDs {
                        targetShardID := 
mv.calculateTargetShardID(uint32(shardID))
-                       partData := 
mv.createStreamingSegmentFromFiles(targetShardID, files, segmentTR, 
data.TopicMeasureSeriesSync.String())
 
-                       // Stream segment to target shard replicas
-                       if err := mv.streamPartToTargetShard(partData); err != 
nil {
+                       // Stream segment to target shard replicas. The factory 
rebuilds the
+                       // part on every retry so each attempt gets fresh 
offset-0 readers.
+                       if err := mv.streamPartToTargetShard(targetShardID, 
func() queue.StreamingPartData {
+                               return 
mv.createStreamingSegmentFromFiles(targetShardID, files, segmentTR, 
data.TopicMeasureSeriesSync.String())
+                       }); err != nil {
                                errorMsg := fmt.Sprintf("failed to stream 
measure segment to target shard %d: %v", targetShardID, err)
                                mv.recordError(scopeSeries, segmentTR, shardID, 
nil, errorMsg)
                                return fmt.Errorf("failed to stream measure 
segment to target shard %d: %w", targetShardID, err)
@@ -370,19 +372,31 @@ func (mv *measureMigrationVisitor) VisitPart(segmentTR 
*timestamp.TimeRange, sou
                        continue
                }
 
-               // Create file readers for this part
-               files, release := 
measure.CreatePartFileReaderFromPath(partPath, mv.lfs)
-               defer release()
-
-               // Clone part data for this target segment
-               targetPartData := partData
-               targetPartData.Group = mv.group
-               targetPartData.ShardID = targetShardID
-               targetPartData.Topic = data.TopicMeasurePartSync.String()
-               targetPartData.Files = files
+               // Reopen the part each attempt for fresh offset-0 readers, 
releasing the
+               // prior attempt's handles first; the deferred call frees the 
last set.
+               var prevRelease func()
+               defer func() {
+                       if prevRelease != nil {
+                               prevRelease()
+                       }
+               }()
+               mk := func() queue.StreamingPartData {
+                       if prevRelease != nil {
+                               prevRelease()
+                               prevRelease = nil
+                       }
+                       files, release := 
measure.CreatePartFileReaderFromPath(partPath, mv.lfs)
+                       prevRelease = release
+                       targetPartData := partData
+                       targetPartData.Group = mv.group
+                       targetPartData.ShardID = targetShardID
+                       targetPartData.Topic = 
data.TopicMeasurePartSync.String()
+                       targetPartData.Files = files
+                       return targetPartData
+               }
 
                // Stream part to target segment
-               if err := mv.streamPartToTargetShard(targetPartData); err != 
nil {
+               if err := mv.streamPartToTargetShard(targetShardID, mk); err != 
nil {
                        errorMsg := fmt.Sprintf("failed to stream measure part 
to target segment %s: %v", targetSegmentTime.Format(time.RFC3339), err)
                        mv.recordError(scopePart, segmentTR, sourceShardID, 
&partID, errorMsg)
                        return fmt.Errorf("failed to stream measure part to 
target segment: %w", err)
@@ -491,22 +505,21 @@ func (mv *measureMigrationVisitor) 
calculateTargetShardID(sourceShardID uint32)
        return calculateTargetShardID(sourceShardID, mv.targetShardNum)
 }
 
-// streamPartToTargetShard sends part data to all replicas of the target shard.
-func (mv *measureMigrationVisitor) streamPartToTargetShard(partData 
queue.StreamingPartData) error {
-       targetShardID := partData.ShardID
+// streamPartToTargetShard sends the part to every replica with bounded
+// exponential-backoff retry (transient: target restarting, disconnect,
+// receiver SERVER_BUSY). streamPartToNode closes the part's readers after each
+// send, so mk() is called per attempt to rebuild fresh offset-0 readers.
+func (mv *measureMigrationVisitor) streamPartToTargetShard(targetShardID 
uint32, mk func() queue.StreamingPartData) error {
        copies := mv.replicas + 1
 
        // Send to all replicas using the exact pattern from steps.go:219-236
        for replicaID := uint32(0); replicaID < copies; replicaID++ {
-               // Use selector.Pick exactly like steps.go:220
-               nodeID, err := mv.selector.Pick(mv.group, "", targetShardID, 
replicaID)
+               err := pickAndRun(mv.logger, mv.selector, mv.group, "", 
targetShardID, replicaID, func(nodeID string) error {
+                       partData := mk()
+                       return mv.streamPartToNode(nodeID, partData.ShardID, 
partData)
+               })
                if err != nil {
-                       return fmt.Errorf("failed to pick node for shard %d 
replica %d: %w", targetShardID, replicaID, err)
-               }
-
-               // Stream part data to target node using chunked sync
-               if err := mv.streamPartToNode(nodeID, targetShardID, partData); 
err != nil {
-                       return fmt.Errorf("failed to stream measure part to 
node %s: %w", nodeID, err)
+                       return fmt.Errorf("failed to stream measure part to 
replica %d: %w", replicaID, err)
                }
        }
 
diff --git a/banyand/backup/lifecycle/retry.go 
b/banyand/backup/lifecycle/retry.go
new file mode 100644
index 000000000..7346962d7
--- /dev/null
+++ b/banyand/backup/lifecycle/retry.go
@@ -0,0 +1,133 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "errors"
+       "fmt"
+       "strings"
+       "time"
+
+       "github.com/cenkalti/backoff/v4"
+
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/node"
+)
+
+// defaultLifecycleSendRetryTimeout bounds the total wall-clock spent retrying
+// transient failures when streaming a part to a target node (e.g. while the
+// target node restarts). Configurable via --lifecycle-send-retry-timeout.
+const defaultLifecycleSendRetryTimeout = 15 * time.Minute
+
+// lifecycleSendRetryTimeout is the active bound, set by the lifecycle flag.
+var lifecycleSendRetryTimeout = defaultLifecycleSendRetryTimeout
+
+// isNoNodes reports whether err means the target node could not be picked
+// because no node is currently available (e.g. the target is restarting).
+// It matches both the typed sentinel and the plain round-robin error string.
+func isNoNodes(err error) bool {
+       if errors.Is(err, node.ErrNoAvailableNode) {
+               return true
+       }
+       return err != nil && strings.Contains(err.Error(), "no nodes available")
+}
+
+// isTransientSend reports whether err from streaming a part to a node is
+// transient and worth retrying: receiver back-pressure (SERVER_BUSY) or a
+// transient/failover gRPC error such as Unavailable, ResourceExhausted, or a
+// dropped connection.
+func isTransientSend(err error) bool {
+       return errors.Is(err, queue.ErrServerBusy) || 
grpchelper.IsTransientError(err) || grpchelper.IsFailoverError(err)
+}
+
+// retryLogInterval throttles the "still retrying" heartbeat so a long wait for
+// a restarting node logs at most once per minute instead of on every attempt.
+const retryLogInterval = time.Minute
+
+// nodePicker is the one method of node.Selector that the send-retry path 
needs;
+// narrowing it keeps runWithRetry/pickWithRetry decoupled and easy to test.
+type nodePicker interface {
+       Pick(group, name string, shardID, replicaID uint32) (string, error)
+}
+
+// runWithRetry runs op under bounded exponential backoff capped by
+// lifecycleSendRetryTimeout. op returns a transient error to retry or
+// backoff.Permanent(err) to stop immediately. While retrying it logs a
+// heartbeat at most once per minute so a long wait for a restarting node is 
not
+// mistaken for a hang.
+func runWithRetry(l *logger.Logger, stage string, op func() error) error {
+       bo := backoff.NewExponentialBackOff()
+       bo.MaxElapsedTime = lifecycleSendRetryTimeout
+       start := time.Now()
+       var lastLog time.Time
+       return backoff.RetryNotify(op, bo, func(err error, _ time.Duration) {
+               now := time.Now()
+               if !lastLog.IsZero() && now.Sub(lastLog) < retryLogInterval {
+                       return
+               }
+               lastLog = now
+               l.Warn().Err(err).Str("stage", stage).Dur("elapsed", 
now.Sub(start)).
+                       Msg("transient lifecycle send failure, still retrying")
+       })
+}
+
+// pickWithRetry picks a target node, retrying on "no nodes available" (target
+// restarting) under runWithRetry's bounded backoff and heartbeat logging.
+func pickWithRetry(l *logger.Logger, p nodePicker, group, name string, shardID 
uint32) (string, error) {
+       var nodeID string
+       err := runWithRetry(l, fmt.Sprintf("pick group=%s name=%s shard=%d", 
group, name, shardID), func() error {
+               var pErr error
+               nodeID, pErr = p.Pick(group, name, shardID, 0)
+               switch {
+               case pErr == nil:
+                       return nil
+               case isNoNodes(pErr):
+                       return pErr
+               default:
+                       return backoff.Permanent(pErr)
+               }
+       })
+       return nodeID, err
+}
+
+// pickAndRun picks a target node and runs run against it inside one bounded
+// backoff loop: a "no nodes" pick error or a transient run error retries the
+// whole pick+run (so a failed send re-picks, possibly a healthier node); any
+// other error stops immediately. Heartbeat-logged like runWithRetry.
+func pickAndRun(l *logger.Logger, p nodePicker, group, name string, shardID, 
replicaID uint32,
+       run func(nodeID string) error,
+) error {
+       return runWithRetry(l, fmt.Sprintf("send group=%s name=%s shard=%d 
replica=%d", group, name, shardID, replicaID), func() error {
+               nodeID, pickErr := p.Pick(group, name, shardID, replicaID)
+               if pickErr != nil {
+                       if isNoNodes(pickErr) {
+                               return pickErr
+                       }
+                       return backoff.Permanent(pickErr)
+               }
+               if runErr := run(nodeID); runErr != nil {
+                       if isTransientSend(runErr) {
+                               return runErr
+                       }
+                       return backoff.Permanent(runErr)
+               }
+               return nil
+       })
+}
diff --git a/banyand/backup/lifecycle/retry_test.go 
b/banyand/backup/lifecycle/retry_test.go
new file mode 100644
index 000000000..684bba16a
--- /dev/null
+++ b/banyand/backup/lifecycle/retry_test.go
@@ -0,0 +1,118 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "errors"
+       "fmt"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/node"
+)
+
+func TestIsNoNodes(t *testing.T) {
+       tests := []struct {
+               err  error
+               name string
+               want bool
+       }{
+               {name: "typed sentinel", err: node.ErrNoAvailableNode, want: 
true},
+               {name: "wrapped sentinel", err: fmt.Errorf("pick failed: %w", 
node.ErrNoAvailableNode), want: true},
+               {name: "plain round-robin string", err: errors.New("no nodes 
available"), want: true},
+               {name: "wrapped round-robin string", err: fmt.Errorf("replica 
0: %w", errors.New("no nodes available")), want: true},
+               {name: "unrelated error", err: errors.New("connection 
refused"), want: false},
+               {name: "nil error", err: nil, want: false},
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       assert.Equal(t, tt.want, isNoNodes(tt.err))
+               })
+       }
+}
+
+func TestIsTransientSend(t *testing.T) {
+       tests := []struct {
+               err  error
+               name string
+               want bool
+       }{
+               {name: "server busy sentinel", err: queue.ErrServerBusy, want: 
true},
+               {name: "wrapped server busy", err: fmt.Errorf("receiver busy on 
node x: %w", queue.ErrServerBusy), want: true},
+               {name: "plain non-transient", err: errors.New("permission 
denied"), want: false},
+               {name: "nil error", err: nil, want: false},
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       assert.Equal(t, tt.want, isTransientSend(tt.err))
+               })
+       }
+}
+
+// fakePicker implements nodePicker: it returns "no nodes available" the first
+// noNodes times, then failErr (if set), then node.
+type fakePicker struct {
+       failErr error
+       node    string
+       noNodes int
+       calls   int
+}
+
+func (f *fakePicker) Pick(_, _ string, _, _ uint32) (string, error) {
+       f.calls++
+       if f.noNodes > 0 {
+               f.noNodes--
+               return "", errors.New("no nodes available")
+       }
+       if f.failErr != nil {
+               return "", f.failErr
+       }
+       return f.node, nil
+}
+
+func TestPickWithRetry(t *testing.T) {
+       l := logger.GetLogger("test")
+
+       t.Run("success first try", func(t *testing.T) {
+               p := &fakePicker{node: "n1"}
+               got, err := pickWithRetry(l, p, "g", "n", 0)
+               assert.NoError(t, err)
+               assert.Equal(t, "n1", got)
+               assert.Equal(t, 1, p.calls)
+       })
+
+       t.Run("permanent error fails fast without retry", func(t *testing.T) {
+               boom := errors.New("unknown shard")
+               p := &fakePicker{failErr: boom}
+               got, err := pickWithRetry(l, p, "g", "n", 0)
+               assert.ErrorIs(t, err, boom)
+               assert.Empty(t, got)
+               assert.Equal(t, 1, p.calls)
+       })
+
+       t.Run("no nodes then recover retries", func(t *testing.T) {
+               p := &fakePicker{node: "n2", noNodes: 1}
+               got, err := pickWithRetry(l, p, "g", "n", 0)
+               assert.NoError(t, err)
+               assert.Equal(t, "n2", got)
+               assert.Equal(t, 2, p.calls)
+       })
+}
diff --git a/banyand/backup/lifecycle/row_replay_measure.go 
b/banyand/backup/lifecycle/row_replay_measure.go
index 6f34a8002..b0c72f46e 100644
--- a/banyand/backup/lifecycle/row_replay_measure.go
+++ b/banyand/backup/lifecycle/row_replay_measure.go
@@ -347,7 +347,7 @@ func (r *measureRowReplayer) routeColumnar(
                        if rErr != nil {
                                return 0, nil, "", fmt.Errorf("route %s: %w", 
bc.subject, rErr)
                        }
-                       pickedNode, pErr := r.selector.Pick(r.group, 
bc.subject, uint32(sid), 0)
+                       pickedNode, pErr := pickWithRetry(r.logger, r.selector, 
r.group, bc.subject, uint32(sid))
                        if pErr != nil {
                                return 0, nil, "", fmt.Errorf("pick target node 
for %s: %w", bc.subject, pErr)
                        }
@@ -363,7 +363,7 @@ func (r *measureRowReplayer) routeColumnar(
        if rErr != nil {
                return 0, nil, "", fmt.Errorf("route %s: %w", bc.subject, rErr)
        }
-       nodeID, pErr := r.selector.Pick(r.group, bc.subject, uint32(sid), 0)
+       nodeID, pErr := pickWithRetry(r.logger, r.selector, r.group, 
bc.subject, uint32(sid))
        if pErr != nil {
                return 0, nil, "", fmt.Errorf("pick target node for %s: %w", 
bc.subject, pErr)
        }
diff --git a/banyand/backup/lifecycle/row_replay_pipeline.go 
b/banyand/backup/lifecycle/row_replay_pipeline.go
index 7b3fc3d94..591edc93a 100644
--- a/banyand/backup/lifecycle/row_replay_pipeline.go
+++ b/banyand/backup/lifecycle/row_replay_pipeline.go
@@ -302,10 +302,10 @@ func (s *batchSender) enqueueMarshaled(ctx 
context.Context, nodeID, group string
 // routeAndEnqueue picks the target node for iwr and enqueues it. The
 // proto-message replayers (stream/trace) share this Pick+build+enqueue tail; 
the
 // measure columnar path routes inline via routeColumnar + enqueueMarshaled.
-func (s *batchSender) routeAndEnqueue(ctx context.Context, selector 
node.Selector,
+func (s *batchSender) routeAndEnqueue(ctx context.Context, l *logger.Logger, 
selector node.Selector,
        group, name string, shardID uint32, iwr proto.Message,
 ) error {
-       nodeID, err := selector.Pick(group, name, shardID, 0)
+       nodeID, err := pickWithRetry(l, selector, group, name, shardID)
        if err != nil {
                return fmt.Errorf("pick target node for %s: %w", name, err)
        }
diff --git a/banyand/backup/lifecycle/row_replay_stream.go 
b/banyand/backup/lifecycle/row_replay_stream.go
index 471a18ba1..408e93c95 100644
--- a/banyand/backup/lifecycle/row_replay_stream.go
+++ b/banyand/backup/lifecycle/row_replay_stream.go
@@ -234,7 +234,7 @@ func (r *streamRowReplayer) publishRow(ctx context.Context, 
pw *orphanPartWriter
        if err != nil {
                return err
        }
-       return r.sender.routeAndEnqueue(ctx, r.selector, r.group, 
wr.Metadata.Name, iwr.ShardId, iwr)
+       return r.sender.routeAndEnqueue(ctx, r.logger, r.selector, r.group, 
wr.Metadata.Name, iwr.ShardId, iwr)
 }
 
 // archiveOrphanRow writes one orphan stream row to the part archive (a no-op
diff --git a/banyand/backup/lifecycle/row_replay_trace.go 
b/banyand/backup/lifecycle/row_replay_trace.go
index fd2d4dd52..919ffcc20 100644
--- a/banyand/backup/lifecycle/row_replay_trace.go
+++ b/banyand/backup/lifecycle/row_replay_trace.go
@@ -136,5 +136,5 @@ func (r *traceRowReplayer) buildWriteRequest(row 
dumptrace.Row) (*tracev1.WriteR
 
 func (r *traceRowReplayer) publishRow(ctx context.Context, row dumptrace.Row) 
error {
        _, iwr := r.buildWriteRequest(row)
-       return r.sender.routeAndEnqueue(ctx, r.selector, r.group, r.traceName, 
iwr.ShardId, iwr)
+       return r.sender.routeAndEnqueue(ctx, r.logger, r.selector, r.group, 
r.traceName, iwr.ShardId, iwr)
 }
diff --git a/banyand/backup/lifecycle/service.go 
b/banyand/backup/lifecycle/service.go
index 693723464..d142148a8 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -212,6 +212,8 @@ func (l *lifecycleService) FlagSet() *run.FlagSet {
                "Maximum rows per row-replay batch (row-replay is the fallback 
for parts spanning multiple target segments)")
        flagS.VarP(&rowReplayMaxBatchBytes, "row-replay-max-batch-bytes", "",
                "Maximum in-flight marshaled bytes per row-replay batch; caps 
peak marshal memory for large bodies (default: 32MB)")
+       flagS.DurationVar(&lifecycleSendRetryTimeout, 
"lifecycle-send-retry-timeout", defaultLifecycleSendRetryTimeout,
+               "Max total time to retry streaming a migration part to a target 
node on transient failures (target restarting, disconnect, receiver busy)")
 
        // Lifecycle server flags
        flagS.BoolVar(&l.lifecycleTLS, "lifecycle-tls", false, "connection uses 
TLS if true, else plain TCP")
diff --git a/banyand/backup/lifecycle/stream_migration_visitor.go 
b/banyand/backup/lifecycle/stream_migration_visitor.go
index 655583de6..87483893e 100644
--- a/banyand/backup/lifecycle/stream_migration_visitor.go
+++ b/banyand/backup/lifecycle/stream_migration_visitor.go
@@ -260,17 +260,11 @@ func (mv *streamMigrationVisitor) VisitSeries(segmentTR 
*timestamp.TimeRange, se
                segmentIDStr := getSegmentTimeRange(targetSegmentTime, 
mv.targetStageInterval).String()
                for _, shardID := range shardIDs {
                        targetShardID := 
mv.calculateTargetShardID(uint32(shardID))
-                       ff := make([]queue.FileInfo, 0, len(files))
-                       for _, file := range files {
-                               ff = append(ff, queue.FileInfo{
-                                       Name:   file.name,
-                                       Reader: file.file.SequentialRead(),
-                               })
-                       }
-                       partData := 
mv.createStreamingSegmentFromFiles(targetShardID, ff, segmentTR, 
data.TopicStreamSeriesSync.String())
-
-                       // Stream segment to target shard replicas
-                       if err := mv.streamPartToTargetShard(partData); err != 
nil {
+                       // Stream segment to target shard replicas. The factory 
rebuilds the
+                       // part on every retry so each attempt gets fresh 
offset-0 readers.
+                       if err := mv.streamPartToTargetShard(targetShardID, 
func() queue.StreamingPartData {
+                               return 
mv.createStreamingSegmentFromFiles(targetShardID, files, segmentTR, 
data.TopicStreamSeriesSync.String())
+                       }); err != nil {
                                errorMsg := fmt.Sprintf("failed to stream 
segment to target shard %d: %v", targetShardID, err)
                                mv.recordError(scopeSeries, segmentTR, shardID, 
nil, errorMsg)
                                return fmt.Errorf("failed to stream segment to 
target shard %d: %w", targetShardID, err)
@@ -347,19 +341,31 @@ func (mv *streamMigrationVisitor) VisitPart(segmentTR 
*timestamp.TimeRange, sour
                        continue
                }
 
-               // Create file readers for this part
-               files, release := stream.CreatePartFileReaderFromPath(partPath, 
mv.lfs)
-               defer release()
-
-               // Clone part data for this target segment
-               targetPartData := partData
-               targetPartData.Group = mv.group
-               targetPartData.ShardID = targetShardID
-               targetPartData.Topic = data.TopicStreamPartSync.String()
-               targetPartData.Files = files
+               // Reopen the part each attempt for fresh offset-0 readers, 
releasing the
+               // prior attempt's handles first; the deferred call frees the 
last set.
+               var prevRelease func()
+               defer func() {
+                       if prevRelease != nil {
+                               prevRelease()
+                       }
+               }()
+               mk := func() queue.StreamingPartData {
+                       if prevRelease != nil {
+                               prevRelease()
+                               prevRelease = nil
+                       }
+                       files, release := 
stream.CreatePartFileReaderFromPath(partPath, mv.lfs)
+                       prevRelease = release
+                       targetPartData := partData
+                       targetPartData.Group = mv.group
+                       targetPartData.ShardID = targetShardID
+                       targetPartData.Topic = data.TopicStreamPartSync.String()
+                       targetPartData.Files = files
+                       return targetPartData
+               }
 
                // Stream part to target segment
-               if err := mv.streamPartToTargetShard(targetPartData); err != 
nil {
+               if err := mv.streamPartToTargetShard(targetShardID, mk); err != 
nil {
                        errorMsg := fmt.Sprintf("failed to stream part to 
target segment %s: %v", targetSegmentTime.Format(time.RFC3339), err)
                        mv.recordError(scopePart, segmentTR, sourceShardID, 
&partID, errorMsg)
                        return fmt.Errorf("failed to stream part to target 
segment: %w", err)
@@ -520,8 +526,9 @@ func (mv *streamMigrationVisitor) 
VisitElementIndex(segmentTR *timestamp.TimeRan
                Uint32("target_shard", targetShardID).
                Msg("found element index segment files for migration")
 
-       // Create FileInfo for this segment file
-       files := make([]queue.FileInfo, 0, len(segmentFiles))
+       // Keep the open segment file handles so the send factory can rebuild
+       // fresh offset-0 readers on every retry via SequentialRead.
+       openFiles := make([]fileInfo, 0, len(segmentFiles))
        // Process each segment file
        for _, segmentFileName := range segmentFiles {
                // Extract segment ID from filename (remove .seg extension)
@@ -558,9 +565,9 @@ func (mv *streamMigrationVisitor) 
VisitElementIndex(segmentTR *timestamp.TimeRan
                // Close the file reader
                defer segmentFile.Close()
 
-               files = append(files, queue.FileInfo{
-                       Name:   segmentFileName,
-                       Reader: segmentFile.SequentialRead(),
+               openFiles = append(openFiles, fileInfo{
+                       file: segmentFile,
+                       name: segmentFileName,
                })
 
                mv.logger.Info().
@@ -571,10 +578,12 @@ func (mv *streamMigrationVisitor) 
VisitElementIndex(segmentTR *timestamp.TimeRan
                        Int("total_segments", 
mv.progress.GetStreamElementIndexCount(mv.group)).
                        Msg("element index segment migration completed 
successfully")
        }
-       partData := mv.createStreamingSegmentFromFiles(targetShardID, files, 
segmentTR, data.TopicStreamElementIndexSync.String())
 
-       // Stream segment file to target shard replicas
-       if err := mv.streamPartToTargetShard(partData); err != nil {
+       // Stream segment file to target shard replicas. The factory rebuilds 
the
+       // part on every retry so each attempt gets fresh offset-0 readers.
+       if err := mv.streamPartToTargetShard(targetShardID, func() 
queue.StreamingPartData {
+               return mv.createStreamingSegmentFromFiles(targetShardID, 
openFiles, segmentTR, data.TopicStreamElementIndexSync.String())
+       }); err != nil {
                errorMsg := fmt.Sprintf("failed to stream element index to 
target shard: %v", err)
                mv.recordError(scopeElementIndex, segmentTR, sourceShardID, 
nil, errorMsg)
                return fmt.Errorf("failed to stream element index to target 
shard: %w", err)
@@ -592,22 +601,21 @@ func (mv *streamMigrationVisitor) 
calculateTargetShardID(sourceShardID uint32) u
        return calculateTargetShardID(sourceShardID, mv.targetShardNum)
 }
 
-// streamPartToTargetShard sends part data to all replicas of the target shard.
-func (mv *streamMigrationVisitor) streamPartToTargetShard(partData 
queue.StreamingPartData) error {
-       targetShardID := partData.ShardID
+// streamPartToTargetShard sends the part to every replica with bounded
+// exponential-backoff retry (transient: target restarting, disconnect,
+// receiver SERVER_BUSY). streamPartToNode closes the part's readers after each
+// send, so mk() is called per attempt to rebuild fresh offset-0 readers.
+func (mv *streamMigrationVisitor) streamPartToTargetShard(targetShardID 
uint32, mk func() queue.StreamingPartData) error {
        copies := mv.replicas + 1
 
        // Send to all replicas using the exact pattern from steps.go:219-236
        for replicaID := uint32(0); replicaID < copies; replicaID++ {
-               // Use selector.Pick exactly like steps.go:220
-               nodeID, err := mv.selector.Pick(mv.group, "", targetShardID, 
replicaID)
+               err := pickAndRun(mv.logger, mv.selector, mv.group, "", 
targetShardID, replicaID, func(nodeID string) error {
+                       partData := mk()
+                       return mv.streamPartToNode(nodeID, partData.ShardID, 
partData)
+               })
                if err != nil {
-                       return fmt.Errorf("failed to pick node for shard %d 
replica %d: %w", targetShardID, replicaID, err)
-               }
-
-               // Stream part data to target node using chunked sync
-               if err := mv.streamPartToNode(nodeID, targetShardID, partData); 
err != nil {
-                       return fmt.Errorf("failed to stream part to node %s: 
%w", nodeID, err)
+                       return fmt.Errorf("failed to stream part to replica %d: 
%w", replicaID, err)
                }
        }
 
@@ -686,15 +694,22 @@ func (mv *streamMigrationVisitor) Close() error {
 // createStreamingSegmentFromFiles creates StreamingPartData from segment 
files.
 func (mv *streamMigrationVisitor) createStreamingSegmentFromFiles(
        targetShardID uint32,
-       files []queue.FileInfo,
+       files []fileInfo,
        segmentTR *timestamp.TimeRange,
        topic string,
 ) queue.StreamingPartData {
+       filesInfo := make([]queue.FileInfo, 0, len(files))
+       for _, file := range files {
+               filesInfo = append(filesInfo, queue.FileInfo{
+                       Name:   file.name,
+                       Reader: file.file.SequentialRead(),
+               })
+       }
        segmentData := queue.StreamingPartData{
                Group:        mv.group,
                ShardID:      targetShardID, // Use calculated target shard
                Topic:        topic,         // Use the new topic
-               Files:        files,
+               Files:        filesInfo,
                MinTimestamp: segmentTR.Start.UnixNano(),
                MaxTimestamp: segmentTR.End.UnixNano(),
        }
diff --git a/banyand/backup/lifecycle/trace_migration_visitor.go 
b/banyand/backup/lifecycle/trace_migration_visitor.go
index 4c05dab01..4ad0bf62c 100644
--- a/banyand/backup/lifecycle/trace_migration_visitor.go
+++ b/banyand/backup/lifecycle/trace_migration_visitor.go
@@ -262,17 +262,11 @@ func (mv *traceMigrationVisitor) VisitSeries(segmentTR 
*timestamp.TimeRange, ser
                segmentIDStr := getSegmentTimeRange(targetSegmentTime, 
mv.targetStageInterval).String()
                for _, shardID := range shardIDs {
                        targetShardID := 
mv.calculateTargetShardID(uint32(shardID))
-                       ff := make([]queue.FileInfo, 0, len(files))
-                       for _, file := range files {
-                               ff = append(ff, queue.FileInfo{
-                                       Name:   file.name,
-                                       Reader: file.file.SequentialRead(),
-                               })
-                       }
-                       partData := 
mv.createStreamingSegmentFromFiles(targetShardID, ff, segmentTR, 
data.TopicTraceSeriesSync.String())
-
-                       // Stream segment to target shard replicas
-                       if err := mv.streamPartToTargetShard(targetShardID, 
[]queue.StreamingPartData{partData}); err != nil {
+                       // Stream segment to target shard replicas. The factory 
rebuilds the
+                       // part on every retry so each attempt gets fresh 
offset-0 readers.
+                       if err := mv.streamPartToTargetShard(targetShardID, 
func() ([]queue.StreamingPartData, error) {
+                               return 
[]queue.StreamingPartData{mv.createStreamingSegmentFromFiles(targetShardID, 
files, segmentTR, data.TopicTraceSeriesSync.String())}, nil
+                       }); err != nil {
                                errorMsg := fmt.Sprintf("failed to stream trace 
segment to target shard %d: %v", targetShardID, err)
                                mv.recordError(scopeSeries, segmentTR, shardID, 
errorMsg)
                                return fmt.Errorf("failed to stream trace 
segment to target shard %d: %w", targetShardID, err)
@@ -323,41 +317,55 @@ func (mv *traceMigrationVisitor) VisitShard(timestampTR 
*timestamp.TimeRange, so
        }
        atomic.AddUint64(&mv.partsCopiedSingleTarget, 1)
        mv.progress.AddTraceChunkSyncShard(mv.group)
-       allParts := make([]queue.StreamingPartData, 0)
 
-       sidxPartData, sidxReleases, err := 
mv.generateAllSidxPartData(timestampTR, sourceShardID, filepath.Join(shardPath, 
"sidx"))
-       if err != nil {
-               return fmt.Errorf("failed to generate sidx part data: %s: %w", 
shardPath, err)
-       }
-       defer func() {
-               for _, release := range sidxReleases {
-                       release()
-               }
-       }()
-       allParts = append(allParts, sidxPartData...)
+       targetShardID := mv.calculateTargetShardID(uint32(sourceShardID))
 
-       partDatas, partDataReleases, err := mv.generateAllPartData(timestampTR, 
sourceShardID, shardPath)
-       if err != nil {
-               return fmt.Errorf("failed to generate core par data: %s: %w", 
shardPath, err)
-       }
+       // Regenerate the sidx + core parts each attempt for fresh offset-0 
readers,
+       // releasing the prior attempt's handles first; the deferred call frees 
the last set.
+       var prevRelease func()
        defer func() {
-               for _, release := range partDataReleases {
-                       release()
+               if prevRelease != nil {
+                       prevRelease()
                }
        }()
-       allParts = append(allParts, partDatas...)
-
-       targetShardID := mv.calculateTargetShardID(uint32(sourceShardID))
+       mk := func() ([]queue.StreamingPartData, error) {
+               if prevRelease != nil {
+                       prevRelease()
+               }
+               var releases []func()
+               // Captures releases by reference, so it frees whatever this 
attempt has
+               // opened by call time — including the partial set on an early 
return.
+               prevRelease = func() {
+                       for _, release := range releases {
+                               release()
+                       }
+               }
+               allParts := make([]queue.StreamingPartData, 0)
+               sidxPartData, sidxReleases, genErr := 
mv.generateAllSidxPartData(timestampTR, sourceShardID, filepath.Join(shardPath, 
"sidx"))
+               if genErr != nil {
+                       return nil, fmt.Errorf("failed to generate sidx part 
data: %s: %w", shardPath, genErr)
+               }
+               releases = append(releases, sidxReleases...)
+               allParts = append(allParts, sidxPartData...)
 
-       sort.Slice(allParts, func(i, j int) bool {
-               if allParts[i].ID == allParts[j].ID {
-                       return allParts[i].PartType < allParts[j].PartType
+               partDatas, partDataReleases, genErr := 
mv.generateAllPartData(timestampTR, sourceShardID, shardPath)
+               if genErr != nil {
+                       return nil, fmt.Errorf("failed to generate core par 
data: %s: %w", shardPath, genErr)
                }
-               return allParts[i].ID < allParts[j].ID
-       })
+               releases = append(releases, partDataReleases...)
+               allParts = append(allParts, partDatas...)
+
+               sort.Slice(allParts, func(i, j int) bool {
+                       if allParts[i].ID == allParts[j].ID {
+                               return allParts[i].PartType < 
allParts[j].PartType
+                       }
+                       return allParts[i].ID < allParts[j].ID
+               })
+               return allParts, nil
+       }
 
        // Stream part to target segment
-       if err := mv.streamPartToTargetShard(targetShardID, allParts); err != 
nil {
+       if err := mv.streamPartToTargetShard(targetShardID, mk); err != nil {
                errorMsg := fmt.Errorf("failed to stream to target shard %d: 
%w", targetShardID, err)
                mv.recordError(scopeShard, timestampTR, sourceShardID, 
errorMsg.Error())
                return fmt.Errorf("failed to stream trace shard to target 
segment shard: %w", err)
@@ -511,6 +519,12 @@ func (mv *traceMigrationVisitor) generateAllSidxPartData(
                        // Create StreamingPartData with PartType = index name
                        partData, err := sidx.ParsePartMetadata(mv.lfs, 
partPath)
                        if err != nil {
+                               // Release this part's just-opened handles and 
all earlier parts'
+                               // so a mid-way metadata failure does not leak 
file descriptors.
+                               release()
+                               for _, r := range releases {
+                                       r()
+                               }
                                return nil, nil, fmt.Errorf("failed to parse 
sidx part metadata: %s: %w", partPath, err)
                        }
                        partData.Group = mv.group
@@ -564,6 +578,10 @@ func (mv *traceMigrationVisitor) generateAllPartData(
                partPath := filepath.Join(shardPath, name)
                parts, releases, err := mv.generatePartData(segmentTR, 
sourceShardID, partPath)
                if err != nil {
+                       // Release earlier parts' handles so a mid-loop failure 
does not leak.
+                       for _, r := range allReleases {
+                               r()
+                       }
                        return nil, nil, fmt.Errorf("failed to generate part 
data for path %s: %w", partPath, err)
                }
                allParts = append(allParts, parts...)
@@ -636,21 +654,24 @@ func (mv *traceMigrationVisitor) 
calculateTargetShardID(sourceShardID uint32) ui
        return calculateTargetShardID(sourceShardID, mv.targetShardNum)
 }
 
-// streamPartToTargetShard sends part data to all replicas of the target shard.
-func (mv *traceMigrationVisitor) streamPartToTargetShard(targetShardID uint32, 
partData []queue.StreamingPartData) error {
+// streamPartToTargetShard sends the parts to every replica with bounded
+// exponential-backoff retry (transient: target restarting, disconnect,
+// receiver SERVER_BUSY). streamPartToNode closes the parts' readers after each
+// send, so mk() is called per attempt to rebuild fresh offset-0 readers.
+func (mv *traceMigrationVisitor) streamPartToTargetShard(targetShardID uint32, 
mk func() ([]queue.StreamingPartData, error)) error {
        copies := mv.replicas + 1
 
        // Send to all replicas using the exact pattern from steps.go:219-236
        for replicaID := uint32(0); replicaID < copies; replicaID++ {
-               // Use selector.Pick exactly like steps.go:220
-               nodeID, err := mv.selector.Pick(mv.group, "", targetShardID, 
replicaID)
+               err := pickAndRun(mv.logger, mv.selector, mv.group, "", 
targetShardID, replicaID, func(nodeID string) error {
+                       partData, mkErr := mk()
+                       if mkErr != nil {
+                               return mkErr
+                       }
+                       return mv.streamPartToNode(nodeID, targetShardID, 
partData)
+               })
                if err != nil {
-                       return fmt.Errorf("failed to pick node for shard %d 
replica %d: %w", targetShardID, replicaID, err)
-               }
-
-               // Stream part data to target node using chunked sync
-               if err := mv.streamPartToNode(nodeID, targetShardID, partData); 
err != nil {
-                       return fmt.Errorf("failed to stream trace part to node 
%s: %w", nodeID, err)
+                       return fmt.Errorf("failed to stream trace part to shard 
%d replica %d: %w", targetShardID, replicaID, err)
                }
        }
 
@@ -726,15 +747,22 @@ func (mv *traceMigrationVisitor) Close() error {
 // createStreamingSegmentFromFiles creates StreamingPartData from segment 
files.
 func (mv *traceMigrationVisitor) createStreamingSegmentFromFiles(
        targetShardID uint32,
-       files []queue.FileInfo,
+       files []fileInfo,
        segmentTR *timestamp.TimeRange,
        topic string,
 ) queue.StreamingPartData {
+       filesInfo := make([]queue.FileInfo, 0, len(files))
+       for _, file := range files {
+               filesInfo = append(filesInfo, queue.FileInfo{
+                       Name:   file.name,
+                       Reader: file.file.SequentialRead(),
+               })
+       }
        segmentData := queue.StreamingPartData{
                Group:        mv.group,
                ShardID:      targetShardID, // Use calculated target shard
                Topic:        topic,         // Use the new topic
-               Files:        files,
+               Files:        filesInfo,
                MinTimestamp: segmentTR.Start.UnixNano(),
                MaxTimestamp: segmentTR.End.UnixNano(),
        }
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 76a8c1ae3..ae18e38ef 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -56,6 +56,7 @@ type option struct {
        seriesCacheMaxSize           run.Bytes
        flushTimeout                 time.Duration
        syncInterval                 time.Duration
+       memWaitTimeout               time.Duration
        failedPartsMaxTotalSizeBytes uint64
        vectorized                   vmeasure.VectorizedConfig
 }
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index 24aeeb4f1..f69c19f2b 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -170,6 +170,8 @@ func (s *dataSVC) FlagSet() *run.FlagSet {
        flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of 
measure")
        flagS.StringVar(&s.dataPath, "measure-data-path", "", "the data 
directory path of measure. If not set, <measure-root-path>/measure/data will be 
used")
        flagS.DurationVar(&s.option.flushTimeout, "measure-flush-timeout", 
defaultFlushTimeout, "the memory data timeout of measure")
+       flagS.DurationVar(&s.option.memWaitTimeout, 
"measure-lifecycle-receive-mem-wait-timeout", 5*time.Minute,
+               "max time the migration receiver waits for memory to recover 
before introducing an external segment")
        s.option.mergePolicy = newDefaultMergePolicy()
        flagS.VarP(&s.option.mergePolicy.maxFanOutSize, 
"measure-max-fan-out-size", "", "the upper bound of a single file size after 
merge of measure")
        s.option.seriesCacheMaxSize = run.Bytes(32 << 20)
@@ -310,7 +312,7 @@ func (s *dataSVC) PreRun(ctx context.Context) error {
        }
 
        s.pipeline.RegisterChunkedSyncHandler(data.TopicMeasurePartSync, 
setUpChunkedSyncCallback(s.l, s.schemaRepo))
-       s.pipeline.RegisterChunkedSyncHandler(data.TopicMeasureSeriesSync, 
setUpSyncSeriesCallback(s.l, s.schemaRepo))
+       s.pipeline.RegisterChunkedSyncHandler(data.TopicMeasureSeriesSync, 
setUpSyncSeriesCallback(s.l, s.schemaRepo, s.pm, s.option.memWaitTimeout))
        err = s.pipeline.Subscribe(data.TopicMeasureSeriesIndexInsert, 
setUpIndexCallback(s.l, s.schemaRepo, data.TopicMeasureSeriesIndexInsert))
        if err != nil {
                return err
diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go
index d34752ed9..3e5449d8b 100644
--- a/banyand/measure/svc_liaison.go
+++ b/banyand/measure/svc_liaison.go
@@ -23,6 +23,7 @@ import (
        "path"
        "path/filepath"
        "strings"
+       "time"
 
        "github.com/pkg/errors"
 
@@ -109,6 +110,8 @@ func (s *liaison) FlagSet() *run.FlagSet {
        flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of 
measure")
        flagS.StringVar(&s.dataPath, "measure-data-path", "", "the data 
directory path of measure. If not set, <measure-root-path>/measure/data will be 
used")
        flagS.DurationVar(&s.option.flushTimeout, "measure-flush-timeout", 
defaultFlushTimeout, "the memory data timeout of measure")
+       flagS.DurationVar(&s.option.memWaitTimeout, 
"measure-lifecycle-receive-mem-wait-timeout", 5*time.Minute,
+               "max time the migration receiver waits for memory to recover 
before introducing an external segment")
        flagS.DurationVar(&s.option.syncInterval, "measure-sync-interval", 
defaultSyncInterval, "the periodic sync interval for measure data")
        flagS.IntVar(&s.maxDiskUsagePercent, "measure-max-disk-usage-percent", 
95, "the maximum disk usage percentage allowed")
        flagS.IntVar(&s.failedPartsMaxSizePercent, 
"failed-parts-max-size-percent", 10,
diff --git a/banyand/measure/svc_standalone.go 
b/banyand/measure/svc_standalone.go
index 9e5eb6358..8be02a81c 100644
--- a/banyand/measure/svc_standalone.go
+++ b/banyand/measure/svc_standalone.go
@@ -177,6 +177,8 @@ func (s *standalone) FlagSet() *run.FlagSet {
        flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of 
measure")
        flagS.StringVar(&s.dataPath, "measure-data-path", "", "the data 
directory path of measure. If not set, <measure-root-path>/measure/data will be 
used")
        flagS.DurationVar(&s.option.flushTimeout, "measure-flush-timeout", 
defaultFlushTimeout, "the memory data timeout of measure")
+       flagS.DurationVar(&s.option.memWaitTimeout, 
"measure-lifecycle-receive-mem-wait-timeout", 5*time.Minute,
+               "max time the migration receiver waits for memory to recover 
before introducing an external segment")
        s.option.mergePolicy = newDefaultMergePolicy()
        flagS.VarP(&s.option.mergePolicy.maxFanOutSize, 
"measure-max-fan-out-size", "", "the upper bound of a single file size after 
merge of measure")
        s.option.seriesCacheMaxSize = run.Bytes(32 << 20)
diff --git a/banyand/measure/write_data.go b/banyand/measure/write_data.go
index c6079f365..301449a82 100644
--- a/banyand/measure/write_data.go
+++ b/banyand/measure/write_data.go
@@ -18,6 +18,7 @@
 package measure
 
 import (
+       "context"
        "fmt"
        "strings"
        "sync/atomic"
@@ -26,6 +27,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index"
@@ -158,6 +160,9 @@ func (s *syncCallback) HandleFileChunk(ctx 
*queue.ChunkedSyncPartContext, chunk
                return fmt.Errorf("part handler is nil")
        }
        partCtx := ctx.Handler.(*syncPartContext)
+       if partCtx.tsTable.pm.State() == protector.StateHigh {
+               return queue.ErrServerBusy
+       }
 
        // Select the appropriate writer based on the filename and write the 
chunk.
        fileName := ctx.FileName
@@ -214,10 +219,13 @@ func releaseWriters(sw *writers) {
 var writersPool = pool.Register[*writers]("measure-writers")
 
 type syncSeriesContext struct {
-       streamer index.ExternalSegmentStreamer
-       segment  storage.Segment[*tsTable, *commonv1.ResourceOpts]
-       l        *logger.Logger
-       fileName string
+       streamer       index.ExternalSegmentStreamer
+       segment        storage.Segment[*tsTable, *commonv1.ResourceOpts]
+       l              *logger.Logger
+       pm             protector.Memory
+       introduceCtx   context.Context
+       fileName       string
+       memWaitTimeout time.Duration
 }
 
 func (s *syncSeriesContext) NewPartType(_ *queue.ChunkedSyncPartContext) error 
{
@@ -227,6 +235,9 @@ func (s *syncSeriesContext) NewPartType(_ 
*queue.ChunkedSyncPartContext) error {
 
 func (s *syncSeriesContext) FinishSync() error {
        if s.streamer != nil {
+               if err := protector.WaitWhileHigh(s.introduceCtx, s.pm, 
s.memWaitTimeout, s.l, "measure-series-introduce"); err != nil {
+                       return err
+               }
                if err := s.streamer.CompleteSegment(); err != nil {
                        s.l.Error().Err(err).Msg("failed to complete external 
segment")
                        return err
@@ -246,14 +257,18 @@ func (s *syncSeriesContext) Close() error {
 }
 
 type syncSeriesCallback struct {
-       l          *logger.Logger
-       schemaRepo *schemaRepo
+       l              *logger.Logger
+       schemaRepo     *schemaRepo
+       pm             protector.Memory
+       memWaitTimeout time.Duration
 }
 
-func setUpSyncSeriesCallback(l *logger.Logger, schemaRepo *schemaRepo) 
queue.ChunkedSyncHandler {
+func setUpSyncSeriesCallback(l *logger.Logger, schemaRepo *schemaRepo, pm 
protector.Memory, memWaitTimeout time.Duration) queue.ChunkedSyncHandler {
        return &syncSeriesCallback{
-               l:          l,
-               schemaRepo: schemaRepo,
+               l:              l,
+               schemaRepo:     schemaRepo,
+               pm:             pm,
+               memWaitTimeout: memWaitTimeout,
        }
 }
 
@@ -278,8 +293,11 @@ func (s *syncSeriesCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext
                return nil, err
        }
        return &syncSeriesContext{
-               l:       s.l,
-               segment: segment,
+               l:              s.l,
+               segment:        segment,
+               pm:             s.pm,
+               memWaitTimeout: s.memWaitTimeout,
+               introduceCtx:   ctx.RetrieveContext(),
        }, nil
 }
 
@@ -289,12 +307,18 @@ func (s *syncSeriesCallback) HandleFileChunk(ctx 
*queue.ChunkedSyncPartContext,
                return fmt.Errorf("part handler is nil")
        }
        seriesCtx := ctx.Handler.(*syncSeriesContext)
+       if seriesCtx.pm.State() == protector.StateHigh {
+               return queue.ErrServerBusy
+       }
 
        if seriesCtx.segment == nil {
                return fmt.Errorf("segment is nil")
        }
        if seriesCtx.fileName != ctx.FileName {
                if seriesCtx.streamer != nil {
+                       if err := 
protector.WaitWhileHigh(ctx.RetrieveContext(), seriesCtx.pm, 
seriesCtx.memWaitTimeout, s.l, "measure-series-introduce"); err != nil {
+                               return err
+                       }
                        if err := seriesCtx.streamer.CompleteSegment(); err != 
nil {
                                s.l.Error().Err(err).Str("group", 
ctx.Group).Msg("failed to complete external segment")
                                return err
diff --git a/banyand/measure/write_data_busy_test.go 
b/banyand/measure/write_data_busy_test.go
new file mode 100644
index 000000000..7ffb1bdf4
--- /dev/null
+++ b/banyand/measure/write_data_busy_test.go
@@ -0,0 +1,67 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// highMem is a fake protector.Memory that always reports a configurable state.
+type highMem struct {
+       protector.Nop
+       state protector.State
+}
+
+// State returns the fixed scripted state.
+func (h highMem) State() protector.State {
+       return h.state
+}
+
+// TestSyncSeriesCallback_HandleFileChunkBusy verifies that the series chunk
+// handler sheds load by returning queue.ErrServerBusy when memory is high,
+// before touching any segment or streamer state.
+func TestSyncSeriesCallback_HandleFileChunkBusy(t *testing.T) {
+       cb := &syncSeriesCallback{l: logger.GetLogger("test")}
+       // seriesCtx has a nil segment; the BUSY check must fire first so the
+       // nil-segment path is never reached.
+       seriesCtx := &syncSeriesContext{
+               l:  logger.GetLogger("test"),
+               pm: highMem{state: protector.StateHigh},
+       }
+       ctx := &queue.ChunkedSyncPartContext{Handler: seriesCtx, FileName: 
"series.idx"}
+       err := cb.HandleFileChunk(ctx, []byte("payload"))
+       require.Error(t, err)
+       assert.ErrorIs(t, err, queue.ErrServerBusy)
+}
+
+// TestSyncSeriesCallback_HandleFileChunkNilHandler verifies the early nil
+// handler guard still applies.
+func TestSyncSeriesCallback_HandleFileChunkNilHandler(t *testing.T) {
+       cb := &syncSeriesCallback{l: logger.GetLogger("test")}
+       ctx := &queue.ChunkedSyncPartContext{}
+       err := cb.HandleFileChunk(ctx, []byte("payload"))
+       require.Error(t, err)
+       assert.NotErrorIs(t, err, queue.ErrServerBusy)
+}
diff --git a/banyand/protector/wait.go b/banyand/protector/wait.go
new file mode 100644
index 000000000..5d8357448
--- /dev/null
+++ b/banyand/protector/wait.go
@@ -0,0 +1,63 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package protector
+
+import (
+       "context"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       waitInitialBackoff = 200 * time.Millisecond
+       waitMaxBackoff     = 2 * time.Second
+       waitHeartbeat      = time.Minute
+)
+
+// WaitWhileHigh blocks while memory pressure is high, bounded by maxWait.
+// It only creates a timeout context when it actually has to wait. It returns
+// the timeout/cancel error if ctx or maxWait elapses before recovery.
+func WaitWhileHigh(ctx context.Context, pm Memory, maxWait time.Duration, l 
*logger.Logger, stage string) error {
+       if pm.State() != StateHigh {
+               return nil
+       }
+       waitCtx, cancel := context.WithTimeout(ctx, maxWait)
+       defer cancel()
+       start := time.Now()
+       l.Warn().Str("stage", stage).Dur("max_wait", maxWait).Msg("memory high, 
migration receive paused, waiting")
+       ticker := time.NewTicker(waitHeartbeat)
+       defer ticker.Stop()
+       backoff := waitInitialBackoff
+       for pm.State() == StateHigh {
+               select {
+               case <-time.After(backoff):
+                       backoff *= 2
+                       if backoff > waitMaxBackoff {
+                               backoff = waitMaxBackoff
+                       }
+               case <-ticker.C:
+                       l.Warn().Str("stage", stage).Dur("waited", 
time.Since(start)).Msg("still waiting for memory")
+               case <-waitCtx.Done():
+                       l.Warn().Str("stage", stage).Dur("waited", 
time.Since(start)).Msg("memory wait aborted (timeout/cancel)")
+                       return waitCtx.Err()
+               }
+       }
+       l.Info().Str("stage", stage).Dur("waited", 
time.Since(start)).Msg("memory recovered, migration receive resumed")
+       return nil
+}
diff --git a/banyand/protector/wait_test.go b/banyand/protector/wait_test.go
new file mode 100644
index 000000000..eec29127e
--- /dev/null
+++ b/banyand/protector/wait_test.go
@@ -0,0 +1,72 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package protector
+
+import (
+       "context"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// fakeMem is a test double that implements the full Memory interface by
+// embedding Nop and overriding State() to advance through a script of states.
+type fakeMem struct {
+       Nop
+       states []State
+       idx    atomic.Int32
+}
+
+// State returns the next scripted state, sticking on the last value.
+func (f *fakeMem) State() State {
+       i := int(f.idx.Load())
+       if i >= len(f.states) {
+               i = len(f.states) - 1
+       } else {
+               f.idx.Add(1)
+       }
+       return f.states[i]
+}
+
+func TestWaitWhileHigh_ReturnsWhenRecovered(t *testing.T) {
+       pm := &fakeMem{states: []State{StateHigh, StateHigh, StateLow}}
+       err := WaitWhileHigh(context.Background(), pm, time.Second, 
logger.GetLogger("test"), "test-stage")
+       assert.NoError(t, err)
+}
+
+func TestWaitWhileHigh_TimesOut(t *testing.T) {
+       pm := &fakeMem{states: []State{StateHigh}}
+       start := time.Now()
+       err := WaitWhileHigh(context.Background(), pm, 50*time.Millisecond, 
logger.GetLogger("test"), "test-stage")
+       require.Error(t, err)
+       assert.ErrorIs(t, err, context.DeadlineExceeded)
+       assert.Less(t, time.Since(start), time.Second, "timeout test must 
complete quickly")
+}
+
+func TestWaitWhileHigh_FastPathNoTimeoutCtx(t *testing.T) {
+       pm := &fakeMem{states: []State{StateLow}}
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       err := WaitWhileHigh(ctx, pm, time.Second, logger.GetLogger("test"), 
"test-stage")
+       assert.NoError(t, err)
+}
diff --git a/banyand/queue/pub/chunked_sync.go 
b/banyand/queue/pub/chunked_sync.go
index 6c30fef0b..cc77e2a68 100644
--- a/banyand/queue/pub/chunked_sync.go
+++ b/banyand/queue/pub/chunked_sync.go
@@ -512,6 +512,9 @@ func (c *chunkedSyncClient) sendChunk(
                case clusterv1.SyncStatus_SYNC_STATUS_SESSION_NOT_FOUND:
                        return fmt.Errorf("session %s not found on server for 
chunk %d: %s", sessionID, *chunkIndex, resp.Error)
 
+               case clusterv1.SyncStatus_SYNC_STATUS_SERVER_BUSY:
+                       return fmt.Errorf("receiver busy for chunk %d: %w", 
*chunkIndex, queue.ErrServerBusy)
+
                default:
                        if resp.Error != "" {
                                return fmt.Errorf("chunk %d sync failed: %s", 
*chunkIndex, resp.Error)
@@ -572,6 +575,9 @@ func classifyChunkedSyncPubErr(err error) string {
        if err == nil {
                return ""
        }
+       if errors.Is(err, queue.ErrServerBusy) {
+               return "server_busy"
+       }
        msg := err.Error()
        switch {
        case strings.Contains(msg, "checksum mismatch"):
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 62944bd49..fc0c03d61 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -40,6 +40,10 @@ import (
 // self-probing without a special-cased branch in the caller.
 var ErrNotImplemented = errors.New("not implemented")
 
+// ErrServerBusy signals the receiver is under memory pressure and the sender
+// should back off and retry the whole part.
+var ErrServerBusy = errors.New("receiver under memory pressure")
+
 // Queue builds a data transmission tunnel between subscribers and publishers.
 //
 //go:generate mockgen -destination=./queue_mock.go -package=queue 
github.com/apache/skywalking-banyandb/pkg/bus MessageListener
@@ -136,7 +140,10 @@ type ChunkedSyncClient interface {
 
 // ChunkedSyncPartContext represents the context for a chunked sync operation.
 type ChunkedSyncPartContext struct {
-       Handler               PartHandler
+       Handler PartHandler
+       // Context carries the gRPC stream context for receive-side cancellation
+       // (memory wait upper bound / disconnect).
+       Context               context.Context
        Group                 string
        FileName              string
        PartType              string
@@ -152,6 +159,15 @@ type ChunkedSyncPartContext struct {
        ShardID               uint32
 }
 
+// RetrieveContext returns the part's stream context, or context.Background()
+// if none was set. Used by receive handlers to bound memory-pressure waits.
+func (c *ChunkedSyncPartContext) RetrieveContext() context.Context {
+       if c.Context == nil {
+               return context.Background()
+       }
+       return c.Context
+}
+
 // Close releases resources associated with the context.
 func (c *ChunkedSyncPartContext) Close() error {
        defer func() {
diff --git a/banyand/queue/queue_test.go b/banyand/queue/queue_test.go
new file mode 100644
index 000000000..8030d9211
--- /dev/null
+++ b/banyand/queue/queue_test.go
@@ -0,0 +1,35 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package queue
+
+import (
+       "context"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+// TestChunkedSyncPartContextRetrieveContext verifies the nil fallback and
+// pass-through behavior of ChunkedSyncPartContext.RetrieveContext.
+func TestChunkedSyncPartContextRetrieveContext(t *testing.T) {
+       assert.Equal(t, context.Background(), 
(&ChunkedSyncPartContext{}).RetrieveContext())
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       assert.Equal(t, ctx, (&ChunkedSyncPartContext{Context: 
ctx}).RetrieveContext())
+}
diff --git a/banyand/queue/sub/chunked_sync.go 
b/banyand/queue/sub/chunked_sync.go
index 1ed1be1fa..dedca8e3f 100644
--- a/banyand/queue/sub/chunked_sync.go
+++ b/banyand/queue/sub/chunked_sync.go
@@ -418,6 +418,7 @@ func (s *server) processExpectedChunk(stream 
clusterv1.ChunkedSyncService_SyncPa
 
                if createNewContext {
                        session.partCtx = &queue.ChunkedSyncPartContext{
+                               Context:               stream.Context(),
                                ID:                    partInfo.Id,
                                Group:                 session.metadata.Group,
                                ShardID:               session.metadata.ShardId,
@@ -452,6 +453,10 @@ func (s *server) processExpectedChunk(stream 
clusterv1.ChunkedSyncService_SyncPa
                }
 
                if processErr := s.processPart(session, req, partInfo, 
partIndex, handler); processErr != nil {
+                       if errors.Is(processErr, queue.ErrServerBusy) {
+                               return s.sendResponse(stream, req, 
clusterv1.SyncStatus_SYNC_STATUS_SERVER_BUSY,
+                                       "receiver under memory pressure, retry 
later", nil)
+                       }
                        s.log.Error().Err(processErr).
                                Str("session_id", req.SessionId).
                                Str("topic", session.metadata.Topic).
diff --git a/banyand/queue/sub/chunked_sync_test.go 
b/banyand/queue/sub/chunked_sync_test.go
index 206f93b8c..f43c970bb 100644
--- a/banyand/queue/sub/chunked_sync_test.go
+++ b/banyand/queue/sub/chunked_sync_test.go
@@ -282,6 +282,68 @@ func TestChunkOrderingBufferFull(t *testing.T) {
        assert.True(t, found, "expected error_type=buffer_full to be recorded")
 }
 
+// TestChunkedSyncServerBusy verifies that when the handler's HandleFileChunk 
reports
+// memory pressure via queue.ErrServerBusy, the server replies with 
SYNC_STATUS_SERVER_BUSY
+// instead of a generic stream error or a CHUNK_RECEIVED success.
+func TestChunkedSyncServerBusy(t *testing.T) {
+       err := logger.Init(logger.Logging{Env: "dev", Level: "info"})
+       require.NoError(t, err)
+
+       s := &server{ //nolint:exhaustruct
+               log:                 logger.GetLogger("test-server-busy"),
+               chunkedSyncHandlers: 
make(map[bus.Topic]queue.ChunkedSyncHandler),
+       }
+       s.chunkedSyncHandlers[data.TopicStreamPartSync] = 
&busyChunkedSyncHandler{}
+
+       mockStream := &MockSyncPartStream{}
+       session := &syncSession{ //nolint:exhaustruct
+               sessionID:     "busy-session",
+               startTime:     time.Now(),
+               partsProgress: make(map[int]*partProgress),
+               metadata: &clusterv1.SyncMetadata{
+                       Group:   "test-group",
+                       ShardId: 1,
+                       Topic:   data.TopicStreamPartSync.String(),
+               },
+       }
+
+       chunkData := []byte("busy-chunk-data")
+       req := &clusterv1.SyncPartRequest{
+               SessionId:     "busy-session",
+               ChunkIndex:    0,
+               ChunkData:     chunkData,
+               ChunkChecksum: fmt.Sprintf("%x", crc32.ChecksumIEEE(chunkData)),
+               PartsInfo: []*clusterv1.PartInfo{
+                       {
+                               Id: 1,
+                               Files: []*clusterv1.FileInfo{
+                                       {Name: "test-file.dat", Offset: 0, 
Size: uint32(len(chunkData))},
+                               },
+                       },
+               },
+       }
+
+       require.NoError(t, s.processChunk(mockStream, session, req))
+
+       require.Len(t, mockStream.sentResponses, 1)
+       resp := mockStream.sentResponses[0]
+       assert.Equal(t, clusterv1.SyncStatus_SYNC_STATUS_SERVER_BUSY, 
resp.Status)
+       assert.NotEmpty(t, resp.Error)
+       assert.Equal(t, "busy-session", resp.SessionId)
+       assert.Equal(t, uint32(0), resp.ChunkIndex)
+}
+
+// busyChunkedSyncHandler is a handler whose HandleFileChunk always reports 
memory pressure.
+type busyChunkedSyncHandler struct{}
+
+func (m *busyChunkedSyncHandler) CreatePartHandler(_ 
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+       return &MockChunkedSyncPartHandler{}, nil
+}
+
+func (m *busyChunkedSyncHandler) HandleFileChunk(_ 
*queue.ChunkedSyncPartContext, _ []byte) error {
+       return queue.ErrServerBusy
+}
+
 // MockChunkedSyncHandler implements queue.ChunkedSyncHandler for testing.
 type MockChunkedSyncHandler struct{}
 
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index a5b3873ac..7d47871f1 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -54,6 +54,7 @@ type option struct {
        flushTimeout                 time.Duration
        elementIndexFlushTimeout     time.Duration
        syncInterval                 time.Duration
+       memWaitTimeout               time.Duration
        failedPartsMaxTotalSizeBytes uint64
 }
 
diff --git a/banyand/stream/svc_liaison.go b/banyand/stream/svc_liaison.go
index 78da92c9c..176b7f6ba 100644
--- a/banyand/stream/svc_liaison.go
+++ b/banyand/stream/svc_liaison.go
@@ -23,6 +23,7 @@ import (
        "path"
        "path/filepath"
        "strings"
+       "time"
 
        "github.com/pkg/errors"
 
@@ -116,6 +117,8 @@ func (s *liaison) FlagSet() *run.FlagSet {
        flagS.StringVar(&s.root, "stream-root-path", "/tmp", "the root path of 
stream")
        flagS.StringVar(&s.dataPath, "stream-data-path", "", "the data 
directory path of stream. If not set, <stream-root-path>/stream/data will be 
used")
        flagS.DurationVar(&s.option.flushTimeout, "stream-flush-timeout", 
defaultFlushTimeout, "the memory data timeout of stream")
+       flagS.DurationVar(&s.option.memWaitTimeout, 
"stream-lifecycle-receive-mem-wait-timeout", 5*time.Minute,
+               "max time the migration receiver waits for memory to recover 
before introducing an external segment")
        flagS.IntVar(&s.maxDiskUsagePercent, "stream-max-disk-usage-percent", 
95, "the maximum disk usage percentage allowed")
        flagS.DurationVar(&s.option.syncInterval, "stream-sync-interval", 
defaultSyncInterval, "the periodic sync interval for stream data")
        flagS.IntVar(&s.failedPartsMaxSizePercent, 
"failed-parts-max-size-percent", 10,
diff --git a/banyand/stream/svc_standalone.go b/banyand/stream/svc_standalone.go
index 9dab8a867..86172aa1d 100644
--- a/banyand/stream/svc_standalone.go
+++ b/banyand/stream/svc_standalone.go
@@ -177,6 +177,8 @@ func (s *standalone) FlagSet() *run.FlagSet {
        flagS.StringVar(&s.dataPath, "stream-data-path", "", "the data 
directory path of stream. If not set, <stream-root-path>/stream/data will be 
used")
        flagS.DurationVar(&s.option.flushTimeout, "stream-flush-timeout", 
defaultFlushTimeout, "the memory data timeout of stream")
        flagS.DurationVar(&s.option.elementIndexFlushTimeout, 
"element-index-flush-timeout", defaultFlushTimeout, "the elementIndex timeout 
of stream")
+       flagS.DurationVar(&s.option.memWaitTimeout, 
"stream-lifecycle-receive-mem-wait-timeout", 5*time.Minute,
+               "max time the migration receiver waits for memory to recover 
before introducing an external segment")
        s.option.mergePolicy = newDefaultMergePolicy()
        flagS.VarP(&s.option.mergePolicy.maxFanOutSize, 
"stream-max-fan-out-size", "", "the upper bound of a single file size after 
merge of stream")
        s.option.seriesCacheMaxSize = run.Bytes(32 << 20)
@@ -293,9 +295,9 @@ func (s *standalone) PreRun(ctx context.Context) error {
        }
        s.pipeline.RegisterChunkedSyncHandler(data.TopicStreamPartSync, 
setUpChunkedSyncCallback(s.l, &s.schemaRepo))
        // Register chunked sync handler for stream series index
-       s.pipeline.RegisterChunkedSyncHandler(data.TopicStreamSeriesSync, 
setUpSyncSeriesCallback(s.l, &s.schemaRepo))
+       s.pipeline.RegisterChunkedSyncHandler(data.TopicStreamSeriesSync, 
setUpSyncSeriesCallback(s.l, &s.schemaRepo, s.pm, s.option.memWaitTimeout))
        // Register chunked sync handler for stream element index
-       s.pipeline.RegisterChunkedSyncHandler(data.TopicStreamElementIndexSync, 
setUpSyncElementIndexCallback(s.l, &s.schemaRepo))
+       s.pipeline.RegisterChunkedSyncHandler(data.TopicStreamElementIndexSync, 
setUpSyncElementIndexCallback(s.l, &s.schemaRepo, s.pm, 
s.option.memWaitTimeout))
 
        err = s.pipeline.Subscribe(data.TopicStreamSeriesIndexWrite, 
setUpSeriesIndexCallback(s.l, &s.schemaRepo))
        if err != nil {
diff --git a/banyand/stream/write_data.go b/banyand/stream/write_data.go
index 8506a36e7..af360c9c4 100644
--- a/banyand/stream/write_data.go
+++ b/banyand/stream/write_data.go
@@ -18,6 +18,7 @@
 package stream
 
 import (
+       "context"
        "fmt"
        "strings"
        "sync/atomic"
@@ -26,6 +27,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index"
@@ -157,6 +159,9 @@ func (s *syncCallback) HandleFileChunk(ctx 
*queue.ChunkedSyncPartContext, chunk
                return fmt.Errorf("part handler is nil")
        }
        partCtx := ctx.Handler.(*syncPartContext)
+       if partCtx.tsTable.pm.State() == protector.StateHigh {
+               return queue.ErrServerBusy
+       }
 
        // Select the appropriate writer based on the filename and write the 
chunk.
        fileName := ctx.FileName
@@ -188,10 +193,13 @@ func (s *syncCallback) HandleFileChunk(ctx 
*queue.ChunkedSyncPartContext, chunk
 }
 
 type syncSeriesContext struct {
-       streamer index.ExternalSegmentStreamer
-       segment  storage.Segment[*tsTable, *commonv1.ResourceOpts]
-       l        *logger.Logger
-       fileName string
+       streamer       index.ExternalSegmentStreamer
+       segment        storage.Segment[*tsTable, *commonv1.ResourceOpts]
+       l              *logger.Logger
+       pm             protector.Memory
+       introduceCtx   context.Context
+       fileName       string
+       memWaitTimeout time.Duration
 }
 
 func (s *syncSeriesContext) NewPartType(_ *queue.ChunkedSyncPartContext) error 
{
@@ -201,6 +209,9 @@ func (s *syncSeriesContext) NewPartType(_ 
*queue.ChunkedSyncPartContext) error {
 
 func (s *syncSeriesContext) FinishSync() error {
        if s.streamer != nil {
+               if err := protector.WaitWhileHigh(s.introduceCtx, s.pm, 
s.memWaitTimeout, s.l, "stream-series-introduce"); err != nil {
+                       return err
+               }
                if err := s.streamer.CompleteSegment(); err != nil {
                        s.l.Error().Err(err).Msg("failed to complete external 
segment")
                        return err
@@ -220,14 +231,18 @@ func (s *syncSeriesContext) Close() error {
 }
 
 type syncSeriesCallback struct {
-       l          *logger.Logger
-       schemaRepo *schemaRepo
+       l              *logger.Logger
+       schemaRepo     *schemaRepo
+       pm             protector.Memory
+       memWaitTimeout time.Duration
 }
 
-func setUpSyncSeriesCallback(l *logger.Logger, schemaRepo *schemaRepo) 
queue.ChunkedSyncHandler {
+func setUpSyncSeriesCallback(l *logger.Logger, schemaRepo *schemaRepo, pm 
protector.Memory, memWaitTimeout time.Duration) queue.ChunkedSyncHandler {
        return &syncSeriesCallback{
-               l:          l,
-               schemaRepo: schemaRepo,
+               l:              l,
+               schemaRepo:     schemaRepo,
+               pm:             pm,
+               memWaitTimeout: memWaitTimeout,
        }
 }
 
@@ -252,8 +267,11 @@ func (s *syncSeriesCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext
                return nil, err
        }
        return &syncSeriesContext{
-               l:       s.l,
-               segment: segment,
+               l:              s.l,
+               segment:        segment,
+               pm:             s.pm,
+               memWaitTimeout: s.memWaitTimeout,
+               introduceCtx:   ctx.RetrieveContext(),
        }, nil
 }
 
@@ -263,12 +281,18 @@ func (s *syncSeriesCallback) HandleFileChunk(ctx 
*queue.ChunkedSyncPartContext,
                return fmt.Errorf("part handler is nil")
        }
        seriesCtx := ctx.Handler.(*syncSeriesContext)
+       if seriesCtx.pm.State() == protector.StateHigh {
+               return queue.ErrServerBusy
+       }
 
        if seriesCtx.segment == nil {
                return fmt.Errorf("segment is nil")
        }
        if seriesCtx.fileName != ctx.FileName {
                if seriesCtx.streamer != nil {
+                       if err := 
protector.WaitWhileHigh(ctx.RetrieveContext(), seriesCtx.pm, 
seriesCtx.memWaitTimeout, s.l, "stream-series-introduce"); err != nil {
+                               return err
+                       }
                        if err := seriesCtx.streamer.CompleteSegment(); err != 
nil {
                                s.l.Error().Err(err).Str("group", 
ctx.Group).Msg("failed to complete external segment")
                                return err
@@ -294,11 +318,14 @@ func (s *syncSeriesCallback) HandleFileChunk(ctx 
*queue.ChunkedSyncPartContext,
 }
 
 type syncElementIndexContext struct {
-       streamer index.ExternalSegmentStreamer
-       segment  storage.Segment[*tsTable, *commonv1.ResourceOpts]
-       tsTable  *tsTable
-       l        *logger.Logger
-       fileName string
+       streamer       index.ExternalSegmentStreamer
+       segment        storage.Segment[*tsTable, *commonv1.ResourceOpts]
+       tsTable        *tsTable
+       l              *logger.Logger
+       pm             protector.Memory
+       introduceCtx   context.Context
+       fileName       string
+       memWaitTimeout time.Duration
 }
 
 func (s *syncElementIndexContext) NewPartType(_ *queue.ChunkedSyncPartContext) 
error {
@@ -308,6 +335,9 @@ func (s *syncElementIndexContext) NewPartType(_ 
*queue.ChunkedSyncPartContext) e
 
 func (s *syncElementIndexContext) FinishSync() error {
        if s.streamer != nil {
+               if err := protector.WaitWhileHigh(s.introduceCtx, s.pm, 
s.memWaitTimeout, s.l, "stream-element-index-introduce"); err != nil {
+                       return err
+               }
                if err := s.streamer.CompleteSegment(); err != nil {
                        s.l.Error().Err(err).Msg("failed to complete external 
segment for element index")
                        return err
@@ -328,14 +358,18 @@ func (s *syncElementIndexContext) Close() error {
 }
 
 type syncElementIndexCallback struct {
-       l          *logger.Logger
-       schemaRepo *schemaRepo
+       l              *logger.Logger
+       schemaRepo     *schemaRepo
+       pm             protector.Memory
+       memWaitTimeout time.Duration
 }
 
-func setUpSyncElementIndexCallback(l *logger.Logger, schemaRepo *schemaRepo) 
queue.ChunkedSyncHandler {
+func setUpSyncElementIndexCallback(l *logger.Logger, schemaRepo *schemaRepo, 
pm protector.Memory, memWaitTimeout time.Duration) queue.ChunkedSyncHandler {
        return &syncElementIndexCallback{
-               l:          l,
-               schemaRepo: schemaRepo,
+               l:              l,
+               schemaRepo:     schemaRepo,
+               pm:             pm,
+               memWaitTimeout: memWaitTimeout,
        }
 }
 
@@ -366,9 +400,12 @@ func (s *syncElementIndexCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartC
        }
 
        return &syncElementIndexContext{
-               l:       s.l,
-               tsTable: tsTable,
-               segment: segment,
+               l:              s.l,
+               tsTable:        tsTable,
+               segment:        segment,
+               pm:             s.pm,
+               memWaitTimeout: s.memWaitTimeout,
+               introduceCtx:   ctx.RetrieveContext(),
        }, nil
 }
 
@@ -378,12 +415,18 @@ func (s *syncElementIndexCallback) HandleFileChunk(ctx 
*queue.ChunkedSyncPartCon
                return fmt.Errorf("part handler is nil")
        }
        elementCtx := ctx.Handler.(*syncElementIndexContext)
+       if elementCtx.pm.State() == protector.StateHigh {
+               return queue.ErrServerBusy
+       }
 
        if elementCtx.tsTable == nil {
                return fmt.Errorf("ts table is nil")
        }
        if elementCtx.fileName != ctx.FileName {
                if elementCtx.streamer != nil {
+                       if err := 
protector.WaitWhileHigh(ctx.RetrieveContext(), elementCtx.pm, 
elementCtx.memWaitTimeout, s.l, "stream-element-index-introduce"); err != nil {
+                               return err
+                       }
                        if err := elementCtx.streamer.CompleteSegment(); err != 
nil {
                                s.l.Error().Err(err).Str("group", 
ctx.Group).Msg("failed to complete external segment for element index")
                                return err
diff --git a/banyand/stream/write_data_busy_test.go 
b/banyand/stream/write_data_busy_test.go
new file mode 100644
index 000000000..de05c7068
--- /dev/null
+++ b/banyand/stream/write_data_busy_test.go
@@ -0,0 +1,94 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// highMem is a fake protector.Memory that always reports a configurable state.
+type highMem struct {
+       protector.Nop
+       state protector.State
+}
+
+// State returns the fixed scripted state.
+func (h highMem) State() protector.State {
+       return h.state
+}
+
+// TestSyncSeriesCallback_HandleFileChunkBusy verifies that the series chunk
+// handler sheds load by returning queue.ErrServerBusy when memory is high,
+// before touching any segment or streamer state.
+func TestSyncSeriesCallback_HandleFileChunkBusy(t *testing.T) {
+       cb := &syncSeriesCallback{l: logger.GetLogger("test")}
+       // seriesCtx has a nil segment; the BUSY check must fire first so the
+       // nil-segment path is never reached.
+       seriesCtx := &syncSeriesContext{
+               l:  logger.GetLogger("test"),
+               pm: highMem{state: protector.StateHigh},
+       }
+       ctx := &queue.ChunkedSyncPartContext{Handler: seriesCtx, FileName: 
"series.idx"}
+       err := cb.HandleFileChunk(ctx, []byte("payload"))
+       require.Error(t, err)
+       assert.ErrorIs(t, err, queue.ErrServerBusy)
+}
+
+// TestSyncSeriesCallback_HandleFileChunkNilHandler verifies the early nil
+// handler guard still applies.
+func TestSyncSeriesCallback_HandleFileChunkNilHandler(t *testing.T) {
+       cb := &syncSeriesCallback{l: logger.GetLogger("test")}
+       ctx := &queue.ChunkedSyncPartContext{}
+       err := cb.HandleFileChunk(ctx, []byte("payload"))
+       require.Error(t, err)
+       assert.NotErrorIs(t, err, queue.ErrServerBusy)
+}
+
+// TestSyncElementIndexCallback_HandleFileChunkBusy verifies that the element
+// index chunk handler sheds load by returning queue.ErrServerBusy when memory
+// is high, before touching any ts table or streamer state.
+func TestSyncElementIndexCallback_HandleFileChunkBusy(t *testing.T) {
+       cb := &syncElementIndexCallback{l: logger.GetLogger("test")}
+       // elementCtx has a nil tsTable; the BUSY check must fire first so the
+       // nil-tsTable path is never reached.
+       elementCtx := &syncElementIndexContext{
+               l:  logger.GetLogger("test"),
+               pm: highMem{state: protector.StateHigh},
+       }
+       ctx := &queue.ChunkedSyncPartContext{Handler: elementCtx, FileName: 
"element.idx"}
+       err := cb.HandleFileChunk(ctx, []byte("payload"))
+       require.Error(t, err)
+       assert.ErrorIs(t, err, queue.ErrServerBusy)
+}
+
+// TestSyncElementIndexCallback_HandleFileChunkNilHandler verifies the early 
nil
+// handler guard still applies.
+func TestSyncElementIndexCallback_HandleFileChunkNilHandler(t *testing.T) {
+       cb := &syncElementIndexCallback{l: logger.GetLogger("test")}
+       ctx := &queue.ChunkedSyncPartContext{}
+       err := cb.HandleFileChunk(ctx, []byte("payload"))
+       require.Error(t, err)
+       assert.NotErrorIs(t, err, queue.ErrServerBusy)
+}
diff --git a/banyand/trace/svc_liaison.go b/banyand/trace/svc_liaison.go
index 92a2418f1..053c9c657 100644
--- a/banyand/trace/svc_liaison.go
+++ b/banyand/trace/svc_liaison.go
@@ -129,6 +129,8 @@ func (l *liaison) FlagSet() *run.FlagSet {
        fs.StringVar(&l.root, "trace-root-path", "/tmp", "the root path for 
trace data")
        fs.StringVar(&l.dataPath, "trace-data-path", "", "the path for trace 
data (optional)")
        fs.DurationVar(&l.option.flushTimeout, "trace-flush-timeout", 
3*time.Second, "the timeout for trace data flush")
+       fs.DurationVar(&l.option.memWaitTimeout, 
"trace-lifecycle-receive-mem-wait-timeout", 5*time.Minute,
+               "max time the migration receiver waits for memory to recover 
before introducing an external segment")
        fs.IntVar(&l.maxDiskUsagePercent, "trace-max-disk-usage-percent", 95, 
"the maximum disk usage percentage")
        fs.DurationVar(&l.option.syncInterval, "trace-sync-interval", 
defaultSyncInterval, "the periodic sync interval for trace data")
        fs.StringSliceVar(&l.dataNodeList, "data-node-list", nil, 
"comma-separated list of data node names to monitor for handoff")
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index 4f0f657a6..a2df01723 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -83,6 +83,8 @@ func (s *standalone) FlagSet() *run.FlagSet {
        fs.StringVar(&s.root, "trace-root-path", "/tmp", "the root path for 
trace data")
        fs.StringVar(&s.dataPath, "trace-data-path", "", "the path for trace 
data (optional)")
        fs.DurationVar(&s.option.flushTimeout, "trace-flush-timeout", 
defaultFlushTimeout, "the timeout for trace data flush")
+       fs.DurationVar(&s.option.memWaitTimeout, 
"trace-lifecycle-receive-mem-wait-timeout", 5*time.Minute,
+               "max time the migration receiver waits for memory to recover 
before introducing an external segment")
 
        // Retention configuration flags
        fs.Float64Var(&s.retentionConfig.HighWatermark, 
"trace-retention-high-watermark", 95.0, "disk usage high watermark percentage 
that triggers forced retention cleanup")
@@ -200,7 +202,7 @@ func (s *standalone) PreRun(ctx context.Context) error {
                return err
        }
        s.pipeline.RegisterChunkedSyncHandler(data.TopicTracePartSync, 
setUpChunkedSyncCallback(s.l, &s.schemaRepo))
-       s.pipeline.RegisterChunkedSyncHandler(data.TopicTraceSeriesSync, 
setUpSeriesSyncCallback(s.l, &s.schemaRepo))
+       s.pipeline.RegisterChunkedSyncHandler(data.TopicTraceSeriesSync, 
setUpSeriesSyncCallback(s.l, &s.schemaRepo, s.pm, s.option.memWaitTimeout))
        err = s.pipeline.Subscribe(data.TopicTraceSidxSeriesWrite, 
setUpSidxSeriesIndexCallback(s.l, &s.schemaRepo))
        if err != nil {
                return err
diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go
index 884972740..d27a324e4 100644
--- a/banyand/trace/trace.go
+++ b/banyand/trace/trace.go
@@ -58,6 +58,7 @@ type option struct {
        seriesCacheMaxSize           run.Bytes
        flushTimeout                 time.Duration
        syncInterval                 time.Duration
+       memWaitTimeout               time.Duration
        failedPartsMaxTotalSizeBytes uint64
        vectorized                   vtrace.VectorizedConfig
 }
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index 7a57ba4ce..e28155235 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -18,6 +18,7 @@
 package trace
 
 import (
+       "context"
        "fmt"
        "path/filepath"
        "strings"
@@ -28,6 +29,7 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/index"
@@ -158,10 +160,13 @@ func (s *syncPartContext) Close() error {
 }
 
 type syncSeriesContext struct {
-       streamer index.ExternalSegmentStreamer
-       segment  storage.Segment[*tsTable, *commonv1.ResourceOpts]
-       l        *logger.Logger
-       fileName string
+       streamer       index.ExternalSegmentStreamer
+       segment        storage.Segment[*tsTable, *commonv1.ResourceOpts]
+       l              *logger.Logger
+       pm             protector.Memory
+       introduceCtx   context.Context
+       fileName       string
+       memWaitTimeout time.Duration
 }
 
 func (s *syncSeriesContext) NewPartType(_ *queue.ChunkedSyncPartContext) error 
{
@@ -171,6 +176,9 @@ func (s *syncSeriesContext) NewPartType(_ 
*queue.ChunkedSyncPartContext) error {
 
 func (s *syncSeriesContext) FinishSync() error {
        if s.streamer != nil {
+               if err := protector.WaitWhileHigh(s.introduceCtx, s.pm, 
s.memWaitTimeout, s.l, "trace-series-introduce"); err != nil {
+                       return err
+               }
                if err := s.streamer.CompleteSegment(); err != nil {
                        s.l.Error().Err(err).Msg("failed to complete external 
segment")
                        return err
@@ -190,14 +198,18 @@ func (s *syncSeriesContext) Close() error {
 }
 
 type syncSeriesCallback struct {
-       l          *logger.Logger
-       schemaRepo *schemaRepo
+       l              *logger.Logger
+       schemaRepo     *schemaRepo
+       pm             protector.Memory
+       memWaitTimeout time.Duration
 }
 
-func setUpSeriesSyncCallback(l *logger.Logger, s *schemaRepo) 
queue.ChunkedSyncHandler {
+func setUpSeriesSyncCallback(l *logger.Logger, s *schemaRepo, pm 
protector.Memory, memWaitTimeout time.Duration) queue.ChunkedSyncHandler {
        return &syncSeriesCallback{
-               l:          l,
-               schemaRepo: s,
+               l:              l,
+               schemaRepo:     s,
+               pm:             pm,
+               memWaitTimeout: memWaitTimeout,
        }
 }
 
@@ -221,8 +233,11 @@ func (s *syncSeriesCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext
                return nil, err
        }
        return &syncSeriesContext{
-               l:       s.l,
-               segment: segment,
+               l:              s.l,
+               segment:        segment,
+               pm:             s.pm,
+               memWaitTimeout: s.memWaitTimeout,
+               introduceCtx:   ctx.RetrieveContext(),
        }, nil
 }
 
@@ -232,12 +247,18 @@ func (s *syncSeriesCallback) HandleFileChunk(ctx 
*queue.ChunkedSyncPartContext,
                return fmt.Errorf("part handler is nil")
        }
        seriesCtx := ctx.Handler.(*syncSeriesContext)
+       if seriesCtx.pm.State() == protector.StateHigh {
+               return queue.ErrServerBusy
+       }
 
        if seriesCtx.segment == nil {
                return fmt.Errorf("segment is nil")
        }
        if seriesCtx.fileName != ctx.FileName {
                if seriesCtx.streamer != nil {
+                       if err := 
protector.WaitWhileHigh(ctx.RetrieveContext(), seriesCtx.pm, 
seriesCtx.memWaitTimeout, s.l, "trace-series-introduce"); err != nil {
+                               return err
+                       }
                        if err := seriesCtx.streamer.CompleteSegment(); err != 
nil {
                                s.l.Error().Err(err).Str("group", 
ctx.Group).Msg("failed to complete external segment")
                                return err
@@ -322,6 +343,10 @@ func (s *syncChunkCallback) HandleFileChunk(ctx 
*queue.ChunkedSyncPartContext, c
        if ctx.Handler == nil {
                return fmt.Errorf("part handler is nil")
        }
+       partCtx := ctx.Handler.(*syncPartContext)
+       if partCtx.tsTable.pm.State() == protector.StateHigh {
+               return queue.ErrServerBusy
+       }
        if ctx.PartType != PartTypeCore {
                return s.handleSidxFileChunk(ctx, chunk)
        }
diff --git a/banyand/trace/write_data_busy_test.go 
b/banyand/trace/write_data_busy_test.go
new file mode 100644
index 000000000..837db3a3b
--- /dev/null
+++ b/banyand/trace/write_data_busy_test.go
@@ -0,0 +1,67 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// highMem is a fake protector.Memory that always reports a configurable state.
+type highMem struct {
+       protector.Nop
+       state protector.State
+}
+
+// State returns the fixed scripted state.
+func (h highMem) State() protector.State {
+       return h.state
+}
+
+// TestSyncSeriesCallback_HandleFileChunkBusy verifies that the series chunk
+// handler sheds load by returning queue.ErrServerBusy when memory is high,
+// before touching any segment or streamer state.
+func TestSyncSeriesCallback_HandleFileChunkBusy(t *testing.T) {
+       cb := &syncSeriesCallback{l: logger.GetLogger("test")}
+       // seriesCtx has a nil segment; the BUSY check must fire first so the
+       // nil-segment path is never reached.
+       seriesCtx := &syncSeriesContext{
+               l:  logger.GetLogger("test"),
+               pm: highMem{state: protector.StateHigh},
+       }
+       ctx := &queue.ChunkedSyncPartContext{Handler: seriesCtx, FileName: 
"series.idx"}
+       err := cb.HandleFileChunk(ctx, []byte("payload"))
+       require.Error(t, err)
+       assert.ErrorIs(t, err, queue.ErrServerBusy)
+}
+
+// TestSyncSeriesCallback_HandleFileChunkNilHandler verifies the early nil
+// handler guard still applies.
+func TestSyncSeriesCallback_HandleFileChunkNilHandler(t *testing.T) {
+       cb := &syncSeriesCallback{l: logger.GetLogger("test")}
+       ctx := &queue.ChunkedSyncPartContext{}
+       err := cb.HandleFileChunk(ctx, []byte("payload"))
+       require.Error(t, err)
+       assert.NotErrorIs(t, err, queue.ErrServerBusy)
+}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 4026f3260..e9a706dfa 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2693,6 +2693,7 @@ SyncStatus represents the status of a sync operation.
 | SYNC_STATUS_SYNC_COMPLETE | 5 | Entire sync operation completed 
successfully. |
 | SYNC_STATUS_VERSION_UNSUPPORTED | 6 | Version not supported for sync 
operations. |
 | SYNC_STATUS_FORMAT_VERSION_MISMATCH | 7 | File format version incompatible. |
+| SYNC_STATUS_SERVER_BUSY | 8 | Receiver under memory pressure; sender should 
back off and retry the whole part. |
 
 
  
diff --git a/docs/operation/configuration.md b/docs/operation/configuration.md
index 6b6a39890..599b34c07 100644
--- a/docs/operation/configuration.md
+++ b/docs/operation/configuration.md
@@ -121,6 +121,7 @@ The following flags are used to configure the measure 
storage engine:
 - `--measure-root-path string`: The root path of the measure database 
(default: "/tmp").
 - `--measure-data-path string`: The data directory path of measure. If not 
set, `<measure-root-path>/measure/data` is used.
 - `--measure-max-fan-out-size bytes`: the upper bound of a single file size 
after merge of measure (default 8.00EiB)
+- `--measure-lifecycle-receive-mem-wait-timeout duration`: Max time a 
lifecycle-migration receiver waits for memory to recover before introducing an 
external segment (default: 5m).
 
 The following flags are used to configure the stream storage engine:
 
@@ -129,12 +130,14 @@ The following flags are used to configure the stream 
storage engine:
 - `--stream-data-path string`: The data directory path of stream. If not set, 
`<stream-root-path>/stream/data` is used.
 - `--stream-max-fan-out-size bytes`: the upper bound of a single file size 
after merge of stream (default 8.00EiB)
 - `--element-index-flush-timeout duration`: The element index timeout of 
stream (default: 1s).
+- `--stream-lifecycle-receive-mem-wait-timeout duration`: Max time a 
lifecycle-migration receiver waits for memory to recover before introducing an 
external segment (default: 5m).
 
 The following flags are used to configure the trace storage engine:
 
 - `--trace-flush-timeout duration`: The memory data timeout of trace (default: 
1s).
 - `--trace-root-path string`: The root path of the database (default: "/tmp").
 - `--trace-max-fan-out-size bytes`: the upper bound of a single file size 
after merge of trace (default 8.00EiB)
+- `--trace-lifecycle-receive-mem-wait-timeout duration`: Max time a 
lifecycle-migration receiver waits for memory to recover before introducing an 
external segment (default: 5m).
 
 The following flags configure the remaining per-catalog storage roots:
 
diff --git a/docs/operation/lifecycle.md b/docs/operation/lifecycle.md
index a04153a27..7a1d66566 100644
--- a/docs/operation/lifecycle.md
+++ b/docs/operation/lifecycle.md
@@ -135,6 +135,7 @@ lifecycle \
 | `--schedule`                        | Schedule for periodic backup (e.g., 
@yearly, @monthly, @weekly, @daily, etc.)                            | `""`     
                      |
 | `--migration-orphan-policy`         | What to do with rows whose 
measure/stream schema was deleted from the registry: `archive` or `discard`   | 
`archive`                      |
 | `--migration-orphan-archive-subdir` | Relative subdirectory, under each 
catalog's root path, for archived orphan rows when policy is `archive` | 
`archive`                      |
+| `--lifecycle-send-retry-timeout`    | Maximum total time to retry streaming 
a migration part to a target node on transient failures (target node 
restarting, disconnect, receiver busy) | `15m`                          |
 
 ## Handling Orphan (Deleted-Schema) Data
 
diff --git a/go.mod b/go.mod
index e4fe08b6a..70af9b785 100644
--- a/go.mod
+++ b/go.mod
@@ -208,7 +208,7 @@ require (
 
 replace (
        github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock 
v1.3.1-0.20220809233656-dc7607c94a97
-       github.com/blugelabs/bluge => github.com/SkyAPM/bluge 
v0.0.0-20250804100126-cccf29a55f01
+       github.com/blugelabs/bluge => github.com/SkyAPM/bluge 
v0.0.0-20260625022800-42385daf66b8
        github.com/blugelabs/bluge_segment_api => 
github.com/zinclabs/bluge_segment_api v1.0.0
        github.com/blugelabs/ice => github.com/SkyAPM/ice 
v0.0.0-20250619023539-b5173603b0b3
 )
diff --git a/go.sum b/go.sum
index 4007af83a..f063e6d05 100644
--- a/go.sum
+++ b/go.sum
@@ -55,8 +55,8 @@ github.com/Nvveen/Gotty 
v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8
 github.com/RoaringBitmap/roaring v0.9.4/go.mod 
h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
 github.com/RoaringBitmap/roaring v1.9.4 
h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ=
 github.com/RoaringBitmap/roaring v1.9.4/go.mod 
h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
-github.com/SkyAPM/bluge v0.0.0-20250804100126-cccf29a55f01 
h1:f56Ybipl9Yh9wFCZc3EoB5OmbW2UJfIpe6e/gsE/DuA=
-github.com/SkyAPM/bluge v0.0.0-20250804100126-cccf29a55f01/go.mod 
h1:VfQRtJQEwpooobYuSBadb8QYjTUICH+CX42IJYrfSiM=
+github.com/SkyAPM/bluge v0.0.0-20260625022800-42385daf66b8 
h1:kNqj2T61hvMFOblOmkADOf4UWpJPh55jQ4WX2U31zaw=
+github.com/SkyAPM/bluge v0.0.0-20260625022800-42385daf66b8/go.mod 
h1:VfQRtJQEwpooobYuSBadb8QYjTUICH+CX42IJYrfSiM=
 github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3 
h1:bZvLPihpC1q1AJkmR6ere/aJOr3ev+JCytvjKuL+gE8=
 github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3/go.mod 
h1:DoQeb0Ee86LyruZSL77Ddscfk/THJ38x453CRCnGEPI=
 github.com/SkyAPM/ktm-ebpf v0.0.0-20260228024820-81a19d950bff 
h1:pjcdkEAe+NL2bA0J/mu/Vs+eG+w+KN/BX9yGkOC1f60=


Reply via email to