Copilot commented on code in PR #832:
URL: 
https://github.com/apache/skywalking-banyandb/pull/832#discussion_r2485692206


##########
banyand/trace/svc_liaison.go:
##########
@@ -122,6 +144,68 @@ func (l *liaison) PreRun(ctx context.Context) error {
        }
 
        traceDataNodeRegistry := 
grpc.NewClusterNodeRegistry(data.TopicTracePartSync, l.option.tire2Client, 
l.dataNodeSelector)
+       // Initialize handoff controller if data nodes are configured
+       l.l.Info().Strs("dataNodeList", l.dataNodeList).Int("maxSizePercent", 
l.handoffMaxSizePercent).
+               Msg("handoff configuration")
+       if len(l.dataNodeList) > 0 && l.option.tire2Client != nil {
+               // Calculate max handoff size based on percentage of disk space
+               // Formula: totalDisk * maxDiskUsagePercent * 
handoffMaxSizePercent / 10000
+               // Example: 100GB disk, 95% max usage, 10% handoff = 100 * 95 * 
10 / 10000 = 9.5GB
+               maxSize := 0
+               if l.handoffMaxSizePercent > 0 {
+                       l.lfs.MkdirIfNotExist(l.dataPath, storage.DirPerm)
+                       totalSpace := l.lfs.MustGetTotalSpace(l.dataPath)
+                       maxSizeBytes := totalSpace * 
uint64(l.maxDiskUsagePercent) * uint64(l.handoffMaxSizePercent) / 10000

Review Comment:
   Integer overflow risk with uint64 multiplication. When `totalSpace` is large 
(e.g., multiple terabytes), multiplying three uint64 values could overflow 
before division. Consider rearranging the calculation to divide earlier or add 
overflow checks.
   ```suggestion
                        maxSizeBytes := (totalSpace / 100) * 
uint64(l.maxDiskUsagePercent)
                        maxSizeBytes = (maxSizeBytes / 100) * 
uint64(l.handoffMaxSizePercent)
   ```



##########
test/integration/handoff/handoff_suite_test.go:
##########
@@ -0,0 +1,584 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package handoff_test
+
+import (
+       "context"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "hash/fnv"
+       "io"
+       "os"
+       "path/filepath"
+       "sort"
+       "strings"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       testtrace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+)
+
+const (
+       nodeHost        = "127.0.0.1"
+       grpcHost        = "127.0.0.1"
+       traceShardCount = 2
+)
+
+type dataNodeHandle struct {
+       closeFn    func()
+       dataDir    string
+       addr       string
+       grpcPort   int
+       gossipPort int
+}
+
+func newDataNodeHandle(dataDir string, grpcPort, gossipPort int) 
*dataNodeHandle {
+       return &dataNodeHandle{
+               dataDir:    dataDir,
+               grpcPort:   grpcPort,
+               gossipPort: gossipPort,
+               addr:       fmt.Sprintf("%s:%d", nodeHost, grpcPort),
+       }
+}
+
+func (h *dataNodeHandle) start(etcdEndpoint string) {
+       Expect(h.closeFn).To(BeNil())
+       args := []string{
+               "data",
+               "--grpc-host=" + grpcHost,
+               fmt.Sprintf("--grpc-port=%d", h.grpcPort),
+               fmt.Sprintf("--property-repair-gossip-grpc-port=%d", 
h.gossipPort),
+               "--stream-root-path=" + h.dataDir,
+               "--measure-root-path=" + h.dataDir,
+               "--property-root-path=" + h.dataDir,
+               "--trace-root-path=" + h.dataDir,
+               "--etcd-endpoints", etcdEndpoint,
+               "--node-host-provider", "flag",
+               "--node-host", nodeHost,
+               "--node-labels", "type=handoff",
+               "--logging-modules", "trace,sidx",
+               "--logging-levels", "debug,debug",
+               "--measure-flush-timeout=0s",
+               "--stream-flush-timeout=0s",
+               "--trace-flush-timeout=0s",
+       }
+       h.closeFn = setup.CMD(args...)
+
+       Eventually(helpers.HealthCheck(fmt.Sprintf("%s:%d", grpcHost, 
h.grpcPort), 10*time.Second, 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials())), 
flags.EventuallyTimeout).Should(Succeed())
+
+       Eventually(func() (map[string]struct{}, error) {
+               m, err := helpers.ListKeys(etcdEndpoint, h.etcdKey())
+               return keysToSet(m), err
+       }, flags.EventuallyTimeout).Should(HaveLen(1))
+}
+
+func (h *dataNodeHandle) stop(etcdEndpoint string) {
+       if h.closeFn != nil {
+               h.closeFn()
+               h.closeFn = nil
+       }
+       Eventually(func() (map[string]struct{}, error) {
+               m, err := helpers.ListKeys(etcdEndpoint, h.etcdKey())
+               return keysToSet(m), err
+       }, flags.EventuallyTimeout).Should(HaveLen(0))
+}
+
+func (h *dataNodeHandle) etcdKey() string {
+       return fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, 
nodeHost, h.grpcPort)
+}
+
+type liaisonHandle struct {
+       closeFn    func()
+       rootPath   string
+       addr       string
+       httpAddr   string
+       grpcPort   int
+       httpPort   int
+       serverPort int
+}
+
+func newLiaisonHandle(rootPath string, grpcPort, httpPort, serverPort int) 
*liaisonHandle {
+       return &liaisonHandle{
+               rootPath:   rootPath,
+               grpcPort:   grpcPort,
+               httpPort:   httpPort,
+               serverPort: serverPort,
+               addr:       fmt.Sprintf("%s:%d", grpcHost, grpcPort),
+               httpAddr:   fmt.Sprintf("%s:%d", grpcHost, httpPort),
+       }
+}
+
+func (l *liaisonHandle) start(etcdEndpoint string, dataNodes []string) {
+       Expect(l.closeFn).To(BeNil())
+       joined := strings.Join(dataNodes, ",")
+       Expect(os.Setenv("BYDB_DATA_NODE_LIST", joined)).To(Succeed())
+       Expect(os.Setenv("BYDB_DATA_NODE_SELECTOR", 
"type=handoff")).To(Succeed())
+       args := []string{
+               "liaison",
+               "--grpc-host=" + grpcHost,
+               fmt.Sprintf("--grpc-port=%d", l.grpcPort),
+               "--http-host=" + grpcHost,
+               fmt.Sprintf("--http-port=%d", l.httpPort),
+               "--liaison-server-grpc-host=" + grpcHost,
+               fmt.Sprintf("--liaison-server-grpc-port=%d", l.serverPort),
+               "--http-grpc-addr=" + l.addr,
+               "--etcd-endpoints", etcdEndpoint,
+               "--node-host-provider", "flag",
+               "--node-host", nodeHost,
+               "--stream-root-path=" + l.rootPath,
+               "--measure-root-path=" + l.rootPath,
+               "--trace-root-path=" + l.rootPath,
+               "--stream-flush-timeout=500ms",
+               "--measure-flush-timeout=500ms",
+               "--trace-flush-timeout=500ms",
+               "--stream-sync-interval=1s",
+               "--measure-sync-interval=1s",
+               "--trace-sync-interval=1s",
+               "--handoff-max-size-percent=100",
+               "--logging-modules", "trace,sidx",
+               "--logging-levels", "debug,debug",
+       }
+
+       l.closeFn = setup.CMD(args...)
+
+       Eventually(helpers.HTTPHealthCheck(l.httpAddr, ""), 
flags.EventuallyTimeout).Should(Succeed())
+       Eventually(func() (map[string]struct{}, error) {
+               m, err := helpers.ListKeys(etcdEndpoint, l.etcdKey())
+               return keysToSet(m), err
+       }, flags.EventuallyTimeout).Should(HaveLen(1))
+}
+
+func (l *liaisonHandle) stop(etcdEndpoint string) {
+       if l.closeFn != nil {
+               l.closeFn()
+               l.closeFn = nil
+       }
+       _ = os.Unsetenv("BYDB_DATA_NODE_LIST")
+       _ = os.Unsetenv("BYDB_DATA_NODE_SELECTOR")
+       Eventually(func() (map[string]struct{}, error) {
+               m, err := helpers.ListKeys(etcdEndpoint, l.etcdKey())
+               return keysToSet(m), err
+       }, flags.EventuallyTimeout).Should(HaveLen(0))
+}
+
+func (l *liaisonHandle) etcdKey() string {
+       return fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, 
nodeHost, l.serverPort)
+}
+
+type suiteInfo struct {
+       LiaisonAddr  string   `json:"liaison_addr"`
+       EtcdEndpoint string   `json:"etcd_endpoint"`
+       HandoffRoot  string   `json:"handoff_root"`
+       DataNodes    []string `json:"data_nodes"`
+}
+
+func keysToSet[K comparable, V any](m map[K]V) map[K]struct{} {
+       if m == nil {
+               return nil
+       }
+       out := make(map[K]struct{}, len(m))
+       for k := range m {
+               out[k] = struct{}{}
+       }
+       return out
+}
+
+var (
+       connection   *grpc.ClientConn
+       goods        []gleak.Goroutine
+       cleanupFunc  func()
+       handoffRoot  string
+       etcdEndpoint string
+
+       dnHandles [2]*dataNodeHandle
+       liaison   *liaisonHandle
+
+       etcdServer   embeddedetcd.Server
+       etcdSpaceDef func()
+       liaisonDef   func()
+       dataDefs     []func()
+)
+
+func TestHandoff(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Trace Handoff Suite")
+}
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{Env: "dev", Level: 
flags.LogLevel})).To(Succeed())
+       goods = gleak.Goroutines()
+
+       etcdPorts, err := test.AllocateFreePorts(2)
+       Expect(err).NotTo(HaveOccurred())
+
+       var etcdDir string
+       etcdDir, etcdSpaceDef, err = test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+
+       clientEP := fmt.Sprintf("http://%s:%d";, nodeHost, etcdPorts[0])
+       peerEP := fmt.Sprintf("http://%s:%d";, nodeHost, etcdPorts[1])
+
+       etcdServer, err = embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{clientEP}, 
[]string{peerEP}),
+               embeddedetcd.RootDir(etcdDir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
+       Expect(err).NotTo(HaveOccurred())
+       <-etcdServer.ReadyNotify()
+       etcdEndpoint = clientEP
+
+       registry, err := schema.NewEtcdSchemaRegistry(
+               schema.Namespace(metadata.DefaultNamespace),
+               schema.ConfigureServerEndpoints([]string{clientEP}),
+       )
+       Expect(err).NotTo(HaveOccurred())
+       defer registry.Close()
+       Expect(testtrace.PreloadSchema(context.Background(), 
registry)).To(Succeed())
+
+       dataDefs = make([]func(), 0, 2)
+       for i := range dnHandles {
+               dataDir, def, errDir := test.NewSpace()
+               Expect(errDir).NotTo(HaveOccurred())
+               dataDefs = append(dataDefs, def)
+               ports, errPorts := test.AllocateFreePorts(2)
+               Expect(errPorts).NotTo(HaveOccurred())
+               dnHandles[i] = newDataNodeHandle(dataDir, ports[0], ports[1])
+               dnHandles[i].start(etcdEndpoint)
+       }
+
+       liaisonPath, def, err := test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       liaisonDef = def
+       liaisonPorts, err := test.AllocateFreePorts(3)
+       Expect(err).NotTo(HaveOccurred())
+       liaison = newLiaisonHandle(liaisonPath, liaisonPorts[0], 
liaisonPorts[1], liaisonPorts[2])
+       nodeAddrs := []string{dnHandles[0].addr, dnHandles[1].addr}
+       liaison.start(etcdEndpoint, nodeAddrs)
+
+       handoffRoot = filepath.Join(liaisonPath, "trace", "data", "handoff", 
"nodes")
+
+       cleanupFunc = func() {
+               if liaison != nil {
+                       liaison.stop(etcdEndpoint)
+               }
+               for i := range dnHandles {
+                       if dnHandles[i] != nil {
+                               dnHandles[i].stop(etcdEndpoint)
+                       }
+               }
+               if etcdServer != nil {
+                       _ = etcdServer.Close()
+               }
+               if liaisonDef != nil {
+                       liaisonDef()
+               }
+               for _, def := range dataDefs {
+                       if def != nil {
+                               def()
+                       }
+               }
+               if etcdSpaceDef != nil {
+                       etcdSpaceDef()
+               }
+       }
+
+       info := suiteInfo{
+               LiaisonAddr:  liaison.addr,
+               EtcdEndpoint: etcdEndpoint,
+               HandoffRoot:  handoffRoot,
+               DataNodes:    nodeAddrs,
+       }
+       payload, err := json.Marshal(info)
+       Expect(err).NotTo(HaveOccurred())
+       return payload
+}, func(data []byte) {
+       var info suiteInfo
+       Expect(json.Unmarshal(data, &info)).To(Succeed())
+       etcdEndpoint = info.EtcdEndpoint
+       handoffRoot = info.HandoffRoot
+       if liaison == nil {
+               liaison = &liaisonHandle{addr: info.LiaisonAddr}
+       } else {
+               liaison.addr = info.LiaisonAddr
+       }
+
+       var err error
+       connection, err = grpchelper.Conn(info.LiaisonAddr, 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {})
+
+var _ = ReportAfterSuite("Trace Handoff Suite", func(report Report) {
+       if report.SuiteSucceeded {
+               if cleanupFunc != nil {
+                       cleanupFunc()
+               }
+               Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       }
+})
+
+var _ = Describe("trace handoff", func() {
+       It("replays data to recovered nodes and empties the queue", func() {
+               Expect(connection).NotTo(BeNil())
+
+               nodeAddrs := []string{dnHandles[0].addr, dnHandles[1].addr}
+               var (
+                       targetIndex = -1
+                       traceID     string
+                       writeTime   time.Time
+                       targetShard uint32
+               )
+
+               for idx := range dnHandles {
+                       candidateShard := shardIDForNode(dnHandles[idx].addr, 
nodeAddrs)
+                       candidateTraceID := 
generateTraceIDForShard(candidateShard, traceShardCount)
+
+                       By(fmt.Sprintf("ensuring the handoff queue for %s 
starts empty", dnHandles[idx].addr))
+                       
Expect(countPendingParts(dnHandles[idx].addr)).To(Equal(0))
+
+                       By(fmt.Sprintf("stopping %s for shard %d", 
dnHandles[idx].addr, candidateShard))
+                       dnHandles[idx].stop(etcdEndpoint)
+                       time.Sleep(7 * time.Second)

Review Comment:
   Hardcoded 7-second sleep with no explanation for why this specific duration 
is needed. This makes the test brittle and slow. Consider using a configurable 
timeout or polling mechanism with a comment explaining what condition is being 
waited for (e.g., 'wait for node to be detected as offline').
   ```suggestion
                        // Wait for the node to be fully stopped/offline before 
proceeding.
                        // This avoids brittle, slow tests due to hardcoded 
sleeps.
                        Eventually(func() bool {
                                // Replace this with a real check if available, 
e.g., node is offline or not responding.
                                // For now, we check that the node's handoff 
queue is not accessible (returns error or -1).
                                return countPendingParts(dnHandles[idx].addr) 
== -1
                        }, flags.EventuallyTimeout, 
500*time.Millisecond).Should(BeTrue())
   ```



##########
banyand/trace/handoff_controller.go:
##########
@@ -0,0 +1,1261 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "context"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "path/filepath"
+       "strings"
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/api/data"
+       "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// handoffController manages handoff queues for multiple data nodes.
+type handoffController struct {
+       fileSystem              fs.FileSystem
+       tire2Client             queueClient
+       resolveShardAssignments func(group string, shardID uint32) ([]string, 
error)
+       inFlightSends           map[string]map[uint64]struct{}
+       l                       *logger.Logger
+       replayTriggerChan       chan string
+       nodeQueues              map[string]*handoffNodeQueue
+       replayStopChan          chan struct{}
+       healthyNodes            map[string]struct{}
+       statusChangeChan        chan nodeStatusChange
+       stopMonitor             chan struct{}
+       root                    string
+       allDataNodes            []string
+       monitorWg               sync.WaitGroup
+       replayWg                sync.WaitGroup
+       checkInterval           time.Duration
+       replayBatchSize         int
+       replayPollInterval      time.Duration
+       maxTotalSizeBytes       uint64
+       currentTotalSize        uint64
+       mu                      sync.RWMutex
+       inFlightMu              sync.RWMutex
+       sizeMu                  sync.RWMutex
+}
+
+// nodeStatusChange represents a node status transition.
+type nodeStatusChange struct {
+       nodeName string
+       isOnline bool
+}
+
+// queueClient interface combines health checking and sync client creation.
+type queueClient interface {
+       HealthyNodes() []string
+       NewChunkedSyncClient(node string, chunkSize uint32) 
(queue.ChunkedSyncClient, error)
+}
+
+// newHandoffController creates a new handoff controller.
+func newHandoffController(fileSystem fs.FileSystem, root string, tire2Client 
queueClient,
+       dataNodeList []string, maxSize int, l *logger.Logger,
+       resolveShardAssignments func(group string, shardID uint32) ([]string, 
error),
+) (*handoffController, error) {
+       if fileSystem == nil {
+               return nil, fmt.Errorf("fileSystem is nil")
+       }
+       if l == nil {
+               return nil, fmt.Errorf("logger is nil")
+       }
+       if root == "" {
+               return nil, fmt.Errorf("root path is empty")
+       }
+
+       handoffRoot := filepath.Join(root, "handoff", "nodes")
+
+       hc := &handoffController{
+               l:                       l,
+               fileSystem:              fileSystem,
+               nodeQueues:              make(map[string]*handoffNodeQueue),
+               root:                    handoffRoot,
+               tire2Client:             tire2Client,
+               allDataNodes:            dataNodeList,
+               healthyNodes:            make(map[string]struct{}),
+               statusChangeChan:        make(chan nodeStatusChange, 100),
+               stopMonitor:             make(chan struct{}),
+               checkInterval:           5 * time.Second,
+               replayStopChan:          make(chan struct{}),
+               replayTriggerChan:       make(chan string, 100),
+               inFlightSends:           make(map[string]map[uint64]struct{}),
+               replayBatchSize:         10,
+               replayPollInterval:      1 * time.Second,
+               maxTotalSizeBytes:       uint64(maxSize),
+               currentTotalSize:        0,
+               resolveShardAssignments: resolveShardAssignments,
+       }
+
+       // Create handoff root directory if it doesn't exist
+       fileSystem.MkdirIfNotExist(handoffRoot, storage.DirPerm)
+
+       // Load existing node queues from disk
+       if err := hc.loadExistingQueues(); err != nil {
+               l.Warn().Err(err).Msg("failed to load existing handoff queues")
+       }
+
+       // Start the node status monitor
+       hc.startMonitor()
+
+       // Start the replay worker
+       hc.startReplayWorker()
+
+       return hc, nil
+}
+
+// loadExistingQueues scans the handoff directory and loads existing node 
queues.
+func (hc *handoffController) loadExistingQueues() error {
+       if hc == nil || hc.fileSystem == nil {
+               return fmt.Errorf("handoff controller is not initialized")
+       }
+
+       entries := hc.fileSystem.ReadDir(hc.root)
+       var totalRecoveredSize uint64
+       var errs []error
+
+       for _, entry := range entries {
+               if !entry.IsDir() {
+                       continue
+               }
+
+               nodeRoot := filepath.Join(hc.root, entry.Name())
+
+               // Read the original node address from .node_info file
+               nodeAddr, err := readNodeInfo(hc.fileSystem, nodeRoot)
+               if err != nil {
+                       hc.l.Warn().Err(err).Str("dir", 
entry.Name()).Msg("failed to read node info, skipping")
+                       errs = append(errs, fmt.Errorf("read node info for %s: 
%w", entry.Name(), err))
+                       continue
+               }
+
+               nodeQueue, err := newHandoffNodeQueue(nodeAddr, nodeRoot, 
hc.fileSystem, hc.l)
+               if err != nil {
+                       hc.l.Warn().Err(err).Str("node", nodeAddr).Msg("failed 
to load node queue")
+                       errs = append(errs, fmt.Errorf("load node queue for %s: 
%w", nodeAddr, err))
+                       continue
+               }
+
+               hc.nodeQueues[nodeAddr] = nodeQueue
+
+               // Calculate queue size from metadata (not from actual file 
sizes)
+               pending, err := nodeQueue.listPending()
+               if err != nil {
+                       hc.l.Warn().Err(err).Str("node", nodeAddr).Msg("failed 
to list pending parts")
+                       errs = append(errs, fmt.Errorf("list pending for %s: 
%w", nodeAddr, err))
+                       continue
+               }
+               var queueSize uint64
+               for _, ptp := range pending {
+                       meta, err := nodeQueue.getMetadata(ptp.PartID, 
ptp.PartType)
+                       if err != nil {
+                               hc.l.Warn().Err(err).
+                                       Str("node", nodeAddr).
+                                       Uint64("partID", ptp.PartID).
+                                       Str("partType", ptp.PartType).
+                                       Msg("failed to read part metadata")
+                               errs = append(errs, fmt.Errorf("metadata for 
node %s part %x (%s): %w",
+                                       nodeAddr, ptp.PartID, ptp.PartType, 
err))
+                               continue
+                       }
+                       if meta.PartSizeBytes > 0 {
+                               queueSize += meta.PartSizeBytes
+                       }
+               }
+               totalRecoveredSize += queueSize
+
+               // Log pending parts count
+               if len(pending) > 0 {
+                       hc.l.Info().
+                               Str("node", nodeAddr).
+                               Int("pending", len(pending)).
+                               Uint64("sizeMB", queueSize/1024/1024).
+                               Msg("loaded handoff queue with pending parts")
+               }
+       }
+
+       // Update current total size
+       hc.currentTotalSize = totalRecoveredSize
+       if totalRecoveredSize > 0 {
+               hc.l.Info().
+                       Uint64("totalSizeMB", totalRecoveredSize/1024/1024).
+                       Int("nodeCount", len(hc.nodeQueues)).
+                       Msg("recovered handoff queue state")
+       }
+       if len(errs) > 0 {
+               return errors.Join(errs...)
+       }
+
+       return nil
+}
+
+// enqueueForNode adds a part to the handoff queue for a specific node.
+func (hc *handoffController) enqueueForNode(nodeAddr string, partID uint64, 
partType string, sourcePath string,
+       group string, shardID uint32,
+) error {
+       // Read part size from metadata
+       partSize := hc.readPartSizeFromMetadata(sourcePath, partType)
+
+       // Check if enqueue would exceed limit
+       if !hc.canEnqueue(partSize) {
+               currentSize := hc.getTotalSize()
+               return fmt.Errorf("handoff queue full: current=%d MB, limit=%d 
MB, part=%d MB",
+                       currentSize/1024/1024, hc.maxTotalSizeBytes/1024/1024, 
partSize/1024/1024)
+       }
+
+       hc.mu.Lock()
+       defer hc.mu.Unlock()
+
+       meta := &handoffMetadata{
+               EnqueueTimestamp: time.Now().UnixNano(),
+               Group:            group,
+               ShardID:          shardID,
+               PartType:         partType,
+               PartSizeBytes:    partSize,
+       }
+
+       nodeQueue, err := hc.getOrCreateNodeQueue(nodeAddr)
+       if err != nil {
+               return fmt.Errorf("failed to get node queue for %s: %w", 
nodeAddr, err)
+       }
+
+       if err := nodeQueue.enqueue(partID, partType, sourcePath, meta); err != 
nil {
+               return err
+       }
+
+       // Update total size after successful enqueue
+       hc.updateTotalSize(int64(partSize))
+
+       return nil
+}
+
+// enqueueForNodes adds a part to the handoff queues for multiple offline 
nodes.
+func (hc *handoffController) enqueueForNodes(offlineNodes []string, partID 
uint64, partType string, sourcePath string,
+       group string, shardID uint32,
+) error {
+       meta := &handoffMetadata{
+               EnqueueTimestamp: time.Now().UnixNano(),
+               Group:            group,
+               ShardID:          shardID,
+               PartType:         partType,
+       }
+
+       hc.mu.Lock()
+       defer hc.mu.Unlock()
+
+       var firstErr error
+       successCount := 0
+
+       // For each offline node, create hard-linked copy
+       for _, nodeAddr := range offlineNodes {
+               nodeQueue, err := hc.getOrCreateNodeQueue(nodeAddr)
+               if err != nil {
+                       hc.l.Error().Err(err).Str("node", nodeAddr).Msg("failed 
to get node queue")
+                       if firstErr == nil {
+                               firstErr = err
+                       }
+                       continue
+               }
+
+               if err := nodeQueue.enqueue(partID, partType, sourcePath, 
meta); err != nil {
+                       hc.l.Error().Err(err).Str("node", 
nodeAddr).Uint64("partId", partID).Str("partType", partType).
+                               Msg("failed to enqueue part")
+                       if firstErr == nil {
+                               firstErr = err
+                       }
+                       continue
+               }
+
+               successCount++
+       }
+
+       if successCount > 0 {
+               hc.l.Info().
+                       Int("successCount", successCount).
+                       Int("totalNodes", len(offlineNodes)).
+                       Uint64("partId", partID).
+                       Str("partType", partType).
+                       Msg("part enqueued to handoff queues")
+       }
+
+       // Return error only if all enqueues failed
+       if successCount == 0 && firstErr != nil {
+               return firstErr
+       }
+
+       return nil
+}
+
+// getOrCreateNodeQueue gets an existing node queue or creates a new one.
+// Caller must hold hc.mu lock.
+func (hc *handoffController) getOrCreateNodeQueue(nodeAddr string) 
(*handoffNodeQueue, error) {
+       // Check if queue already exists
+       if queue, exists := hc.nodeQueues[nodeAddr]; exists {
+               return queue, nil
+       }
+
+       // Create new queue
+       sanitizedAddr := sanitizeNodeAddr(nodeAddr)
+       nodeRoot := filepath.Join(hc.root, sanitizedAddr)
+
+       nodeQueue, err := newHandoffNodeQueue(nodeAddr, nodeRoot, 
hc.fileSystem, hc.l)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create node queue: %w", err)
+       }
+
+       hc.nodeQueues[nodeAddr] = nodeQueue
+       return nodeQueue, nil
+}
+
+// listPendingForNode returns all pending parts with their types for a 
specific node.
+func (hc *handoffController) listPendingForNode(nodeAddr string) 
([]partTypePair, error) {
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+
+       nodeQueue, exists := hc.nodeQueues[nodeAddr]
+       if !exists {
+               return nil, nil // No queue means no pending parts
+       }
+
+       return nodeQueue.listPending()
+}
+
+// getPartPath returns the path to a specific part type directory in a node's 
handoff queue.
+func (hc *handoffController) getPartPath(nodeAddr string, partID uint64, 
partType string) string {
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+
+       nodeQueue, exists := hc.nodeQueues[nodeAddr]
+       if !exists {
+               return ""
+       }
+
+       return nodeQueue.getPartTypePath(partID, partType)
+}
+
+// getPartMetadata returns the handoff metadata for a specific part type.
+func (hc *handoffController) getPartMetadata(nodeAddr string, partID uint64, 
partType string) (*handoffMetadata, error) {
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+
+       nodeQueue, exists := hc.nodeQueues[nodeAddr]
+       if !exists {
+               return nil, fmt.Errorf("node queue not found for %s", nodeAddr)
+       }
+
+       return nodeQueue.getMetadata(partID, partType)
+}
+
+// completeSend removes a specific part type from a node's handoff queue after 
successful delivery.
+func (hc *handoffController) completeSend(nodeAddr string, partID uint64, 
partType string) error {
+       // Get part size before removing
+       var partSize uint64
+       meta, err := hc.getPartMetadata(nodeAddr, partID, partType)
+       if err == nil && meta.PartSizeBytes > 0 {
+               partSize = meta.PartSizeBytes
+       }
+
+       hc.mu.RLock()
+       nodeQueue, exists := hc.nodeQueues[nodeAddr]
+       hc.mu.RUnlock()
+
+       if !exists {
+               return fmt.Errorf("node queue not found for %s", nodeAddr)
+       }
+
+       if err := nodeQueue.complete(partID, partType); err != nil {
+               return err
+       }
+
+       // Update total size after successful removal
+       if partSize > 0 {
+               hc.updateTotalSize(-int64(partSize))
+       }
+
+       return nil
+}
+
+// completeSendAll removes all part types for a given partID from a node's 
handoff queue.
+func (hc *handoffController) completeSendAll(nodeAddr string, partID uint64) 
error {
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+
+       nodeQueue, exists := hc.nodeQueues[nodeAddr]
+       if !exists {
+               return fmt.Errorf("node queue not found for %s", nodeAddr)
+       }
+
+       return nodeQueue.completeAll(partID)
+}
+
+// getNodeQueueSize returns the total size of pending parts for a specific 
node.
+func (hc *handoffController) getNodeQueueSize(nodeAddr string) (uint64, error) 
{
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+
+       nodeQueue, exists := hc.nodeQueues[nodeAddr]
+       if !exists {
+               return 0, nil
+       }
+
+       return nodeQueue.size()
+}
+
+// getAllNodeQueues returns a snapshot of all node addresses with handoff 
queues.
+func (hc *handoffController) getAllNodeQueues() []string {
+       hc.mu.RLock()
+       defer hc.mu.RUnlock()
+
+       nodes := make([]string, 0, len(hc.nodeQueues))
+       for nodeAddr := range hc.nodeQueues {
+               nodes = append(nodes, nodeAddr)
+       }
+
+       return nodes
+}
+
+// partInfo contains information about a part to be enqueued.
+type partInfo struct {
+       path    string
+       group   string
+       partID  uint64
+       shardID common.ShardID
+}
+
+// calculateOfflineNodes returns the list of offline nodes responsible for the 
shard.
+func (hc *handoffController) calculateOfflineNodes(onlineNodes []string, group 
string, shardID common.ShardID) []string {
+       if hc == nil {
+               return nil
+       }
+
+       onlineSet := make(map[string]struct{}, len(onlineNodes))
+       for _, node := range onlineNodes {
+               onlineSet[node] = struct{}{}
+       }
+
+       candidates := hc.nodesForShard(group, uint32(shardID))
+       seen := make(map[string]struct{}, len(candidates))
+       var offlineNodes []string
+       for _, node := range candidates {
+               if _, dup := seen[node]; dup {
+                       continue
+               }
+               seen[node] = struct{}{}
+               if !hc.isNodeHealthy(node) {
+                       offlineNodes = append(offlineNodes, node)
+                       continue
+               }
+               if len(onlineSet) > 0 {
+                       if _, isOnline := onlineSet[node]; !isOnline {
+                               offlineNodes = append(offlineNodes, node)
+                       }
+                       continue
+               }
+               if hc.tire2Client == nil {
+                       offlineNodes = append(offlineNodes, node)
+               }
+       }
+
+       return offlineNodes
+}
+
+// enqueueForOfflineNodes enqueues the provided core and sidx parts for each 
offline node.
+func (hc *handoffController) enqueueForOfflineNodes(
+       offlineNodes []string,
+       coreParts []partInfo,
+       sidxParts map[string][]partInfo,
+) error {
+       if hc == nil || len(offlineNodes) == 0 {
+               return nil
+       }
+
+       group, shardID, hasShardInfo := extractShardDetails(coreParts, 
sidxParts)
+       if hasShardInfo {
+               filtered := hc.filterNodesForShard(offlineNodes, group, shardID)
+               if len(filtered) == 0 {
+                       hc.l.Debug().
+                               Str("group", group).
+                               Uint32("shardID", shardID).
+                               Msg("no offline shard owners to enqueue")
+                       return nil
+               }
+               offlineNodes = filtered
+       }
+
+       // Track enqueue statistics
+       totalCoreEnqueued := 0
+       totalSidxEnqueued := 0
+       var lastErr error
+
+       // For each offline node, enqueue all parts
+       for _, nodeAddr := range offlineNodes {
+               // Enqueue core parts
+               for _, coreInfo := range coreParts {
+                       err := hc.enqueueForNode(nodeAddr, coreInfo.partID, 
PartTypeCore, coreInfo.path, coreInfo.group, uint32(coreInfo.shardID))
+                       if err != nil {
+                               hc.l.Warn().Err(err).
+                                       Str("node", nodeAddr).
+                                       Uint64("partID", coreInfo.partID).
+                                       Msg("failed to enqueue core part")
+                               lastErr = err
+                       } else {
+                               totalCoreEnqueued++
+                       }
+               }
+
+               // Enqueue sidx parts
+               for sidxName, parts := range sidxParts {
+                       for _, sidxInfo := range parts {
+                               err := hc.enqueueForNode(nodeAddr, 
sidxInfo.partID, sidxName, sidxInfo.path, sidxInfo.group, 
uint32(sidxInfo.shardID))
+                               if err != nil {
+                                       hc.l.Warn().Err(err).
+                                               Str("node", nodeAddr).
+                                               Str("sidx", sidxName).
+                                               Uint64("partID", 
sidxInfo.partID).
+                                               Msg("failed to enqueue sidx 
part")
+                                       lastErr = err
+                               } else {
+                                       totalSidxEnqueued++
+                               }
+                       }
+               }
+       }
+
+       // Log summary
+       hc.l.Info().
+               Int("offlineNodes", len(offlineNodes)).
+               Str("group", group).
+               Uint32("shardID", shardID).
+               Int("corePartsEnqueued", totalCoreEnqueued).
+               Int("sidxPartsEnqueued", totalSidxEnqueued).
+               Msg("enqueued parts for offline nodes")
+
+       return lastErr
+}
+
+func extractShardDetails(coreParts []partInfo, sidxParts 
map[string][]partInfo) (string, uint32, bool) {
+       if len(coreParts) > 0 {
+               return coreParts[0].group, uint32(coreParts[0].shardID), true
+       }
+       for _, parts := range sidxParts {
+               if len(parts) == 0 {
+                       continue
+               }
+               return parts[0].group, uint32(parts[0].shardID), true
+       }
+       return "", 0, false
+}
+
+func (hc *handoffController) nodesForShard(group string, shardID uint32) 
[]string {
+       if hc == nil {
+               return nil
+       }
+       if hc.resolveShardAssignments == nil {
+               return hc.allDataNodes
+       }
+       nodes, err := hc.resolveShardAssignments(group, shardID)
+       if err != nil {
+               if hc.l != nil {
+                       hc.l.Warn().
+                               Err(err).
+                               Str("group", group).
+                               Uint32("shardID", shardID).
+                               Msg("failed to resolve shard assignments, using 
configured node list")
+               }
+               return hc.allDataNodes
+       }
+       if len(nodes) == 0 {
+               return hc.allDataNodes
+       }
+       return nodes
+}
+
+func (hc *handoffController) filterNodesForShard(nodes []string, group string, 
shardID uint32) []string {
+       candidates := hc.nodesForShard(group, shardID)
+       if len(candidates) == 0 {
+               return nil
+       }
+       candidateSet := make(map[string]struct{}, len(candidates))
+       for _, node := range candidates {
+               candidateSet[node] = struct{}{}
+       }
+       filtered := make([]string, 0, len(nodes))
+       seen := make(map[string]struct{}, len(nodes))
+       for _, node := range nodes {
+               if _, already := seen[node]; already {
+                       continue
+               }
+               seen[node] = struct{}{}
+               if _, ok := candidateSet[node]; ok {
+                       filtered = append(filtered, node)
+               }
+       }
+       return filtered
+}
+
+// close closes the handoff controller.
+func (hc *handoffController) close() error {
+       // Stop the monitor
+       if hc.stopMonitor != nil {
+               close(hc.stopMonitor)
+               hc.monitorWg.Wait()
+       }
+
+       // Stop the replay worker
+       if hc.replayStopChan != nil {
+               close(hc.replayStopChan)
+               hc.replayWg.Wait()
+       }
+
+       hc.mu.Lock()
+       defer hc.mu.Unlock()
+
+       // Clear node queues
+       hc.nodeQueues = nil
+
+       // Clear in-flight tracking
+       hc.inFlightSends = nil
+
+       return nil
+}
+
+// getTotalSize returns the current total size across all node queues.
+func (hc *handoffController) getTotalSize() uint64 {
+       hc.sizeMu.RLock()
+       defer hc.sizeMu.RUnlock()
+       return hc.currentTotalSize
+}
+
+// canEnqueue checks if adding a part of the given size would exceed the total 
size limit.
+func (hc *handoffController) canEnqueue(partSize uint64) bool {
+       if hc.maxTotalSizeBytes == 0 {
+               return true // No limit configured
+       }
+
+       hc.sizeMu.RLock()
+       defer hc.sizeMu.RUnlock()
+       return hc.currentTotalSize+partSize <= hc.maxTotalSizeBytes
+}
+
+// readPartSizeFromMetadata reads the CompressedSizeBytes from the part's 
metadata file.
+func (hc *handoffController) readPartSizeFromMetadata(sourcePath, partType 
string) uint64 {
+       var metadataPath string
+
+       // Core parts use metadata.json, sidx parts use manifest.json
+       if partType == PartTypeCore {
+               metadataPath = filepath.Join(sourcePath, metadataFilename) // 
"metadata.json"
+       } else {
+               metadataPath = filepath.Join(sourcePath, "manifest.json")
+       }
+
+       // Read metadata file
+       data, err := hc.fileSystem.Read(metadataPath)
+       if err != nil {
+               hc.l.Warn().Err(err).Str("path", metadataPath).Msg("failed to 
read metadata file")
+               return 0
+       }
+
+       // Parse metadata to get CompressedSizeBytes
+       var metadata struct {
+               CompressedSizeBytes uint64 `json:"compressedSizeBytes"`
+       }
+       if err := json.Unmarshal(data, &metadata); err != nil {
+               hc.l.Warn().Err(err).Str("path", metadataPath).Msg("failed to 
parse metadata")
+               return 0
+       }
+
+       return metadata.CompressedSizeBytes
+}
+
+// updateTotalSize atomically updates the current total size.
+func (hc *handoffController) updateTotalSize(delta int64) {
+       hc.sizeMu.Lock()
+       defer hc.sizeMu.Unlock()
+
+       if delta > 0 {
+               // Enqueue: add to total
+               hc.currentTotalSize += uint64(delta)
+       } else if delta < 0 {
+               // Complete: subtract from total with underflow check
+               toSubtract := uint64(-delta)
+               if toSubtract > hc.currentTotalSize {
+                       hc.l.Warn().
+                               Uint64("current", hc.currentTotalSize).
+                               Uint64("toSubtract", toSubtract).
+                               Msg("attempted to subtract more than current 
size, resetting to 0")
+                       hc.currentTotalSize = 0
+               } else {
+                       hc.currentTotalSize -= toSubtract
+               }
+       }

Review Comment:
   The underflow warning and reset to 0 masks a potential bug in the size 
tracking logic. If `toSubtract > currentTotalSize`, this indicates parts were 
completed without being properly tracked during enqueue, or double-completion 
occurred. Consider investigating the root cause and potentially returning an 
error instead of silently resetting to prevent data inconsistencies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to