Copilot commented on code in PR #832:
URL:
https://github.com/apache/skywalking-banyandb/pull/832#discussion_r2485692206
##########
banyand/trace/svc_liaison.go:
##########
@@ -122,6 +144,68 @@ func (l *liaison) PreRun(ctx context.Context) error {
}
traceDataNodeRegistry :=
grpc.NewClusterNodeRegistry(data.TopicTracePartSync, l.option.tire2Client,
l.dataNodeSelector)
+ // Initialize handoff controller if data nodes are configured
+ l.l.Info().Strs("dataNodeList", l.dataNodeList).Int("maxSizePercent",
l.handoffMaxSizePercent).
+ Msg("handoff configuration")
+ if len(l.dataNodeList) > 0 && l.option.tire2Client != nil {
+ // Calculate max handoff size based on percentage of disk space
+ // Formula: totalDisk * maxDiskUsagePercent *
handoffMaxSizePercent / 10000
+ // Example: 100GB disk, 95% max usage, 10% handoff = 100 * 95 *
10 / 10000 = 9.5GB
+ maxSize := 0
+ if l.handoffMaxSizePercent > 0 {
+ l.lfs.MkdirIfNotExist(l.dataPath, storage.DirPerm)
+ totalSpace := l.lfs.MustGetTotalSpace(l.dataPath)
+ maxSizeBytes := totalSpace *
uint64(l.maxDiskUsagePercent) * uint64(l.handoffMaxSizePercent) / 10000
Review Comment:
Integer overflow risk with uint64 multiplication. When `totalSpace` is large
(e.g., multiple terabytes), multiplying three uint64 values could overflow
before division. Consider rearranging the calculation to divide earlier or add
overflow checks.
```suggestion
maxSizeBytes := (totalSpace / 100) *
uint64(l.maxDiskUsagePercent)
maxSizeBytes = (maxSizeBytes / 100) *
uint64(l.handoffMaxSizePercent)
```
##########
test/integration/handoff/handoff_suite_test.go:
##########
@@ -0,0 +1,584 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package handoff_test
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "hash/fnv"
+ "io"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ testtrace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+)
+
+const (
+ nodeHost = "127.0.0.1"
+ grpcHost = "127.0.0.1"
+ traceShardCount = 2
+)
+
+type dataNodeHandle struct {
+ closeFn func()
+ dataDir string
+ addr string
+ grpcPort int
+ gossipPort int
+}
+
+func newDataNodeHandle(dataDir string, grpcPort, gossipPort int)
*dataNodeHandle {
+ return &dataNodeHandle{
+ dataDir: dataDir,
+ grpcPort: grpcPort,
+ gossipPort: gossipPort,
+ addr: fmt.Sprintf("%s:%d", nodeHost, grpcPort),
+ }
+}
+
+func (h *dataNodeHandle) start(etcdEndpoint string) {
+ Expect(h.closeFn).To(BeNil())
+ args := []string{
+ "data",
+ "--grpc-host=" + grpcHost,
+ fmt.Sprintf("--grpc-port=%d", h.grpcPort),
+ fmt.Sprintf("--property-repair-gossip-grpc-port=%d",
h.gossipPort),
+ "--stream-root-path=" + h.dataDir,
+ "--measure-root-path=" + h.dataDir,
+ "--property-root-path=" + h.dataDir,
+ "--trace-root-path=" + h.dataDir,
+ "--etcd-endpoints", etcdEndpoint,
+ "--node-host-provider", "flag",
+ "--node-host", nodeHost,
+ "--node-labels", "type=handoff",
+ "--logging-modules", "trace,sidx",
+ "--logging-levels", "debug,debug",
+ "--measure-flush-timeout=0s",
+ "--stream-flush-timeout=0s",
+ "--trace-flush-timeout=0s",
+ }
+ h.closeFn = setup.CMD(args...)
+
+ Eventually(helpers.HealthCheck(fmt.Sprintf("%s:%d", grpcHost,
h.grpcPort), 10*time.Second, 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials())),
flags.EventuallyTimeout).Should(Succeed())
+
+ Eventually(func() (map[string]struct{}, error) {
+ m, err := helpers.ListKeys(etcdEndpoint, h.etcdKey())
+ return keysToSet(m), err
+ }, flags.EventuallyTimeout).Should(HaveLen(1))
+}
+
+func (h *dataNodeHandle) stop(etcdEndpoint string) {
+ if h.closeFn != nil {
+ h.closeFn()
+ h.closeFn = nil
+ }
+ Eventually(func() (map[string]struct{}, error) {
+ m, err := helpers.ListKeys(etcdEndpoint, h.etcdKey())
+ return keysToSet(m), err
+ }, flags.EventuallyTimeout).Should(HaveLen(0))
+}
+
+func (h *dataNodeHandle) etcdKey() string {
+ return fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace,
nodeHost, h.grpcPort)
+}
+
+type liaisonHandle struct {
+ closeFn func()
+ rootPath string
+ addr string
+ httpAddr string
+ grpcPort int
+ httpPort int
+ serverPort int
+}
+
+func newLiaisonHandle(rootPath string, grpcPort, httpPort, serverPort int)
*liaisonHandle {
+ return &liaisonHandle{
+ rootPath: rootPath,
+ grpcPort: grpcPort,
+ httpPort: httpPort,
+ serverPort: serverPort,
+ addr: fmt.Sprintf("%s:%d", grpcHost, grpcPort),
+ httpAddr: fmt.Sprintf("%s:%d", grpcHost, httpPort),
+ }
+}
+
+func (l *liaisonHandle) start(etcdEndpoint string, dataNodes []string) {
+ Expect(l.closeFn).To(BeNil())
+ joined := strings.Join(dataNodes, ",")
+ Expect(os.Setenv("BYDB_DATA_NODE_LIST", joined)).To(Succeed())
+ Expect(os.Setenv("BYDB_DATA_NODE_SELECTOR",
"type=handoff")).To(Succeed())
+ args := []string{
+ "liaison",
+ "--grpc-host=" + grpcHost,
+ fmt.Sprintf("--grpc-port=%d", l.grpcPort),
+ "--http-host=" + grpcHost,
+ fmt.Sprintf("--http-port=%d", l.httpPort),
+ "--liaison-server-grpc-host=" + grpcHost,
+ fmt.Sprintf("--liaison-server-grpc-port=%d", l.serverPort),
+ "--http-grpc-addr=" + l.addr,
+ "--etcd-endpoints", etcdEndpoint,
+ "--node-host-provider", "flag",
+ "--node-host", nodeHost,
+ "--stream-root-path=" + l.rootPath,
+ "--measure-root-path=" + l.rootPath,
+ "--trace-root-path=" + l.rootPath,
+ "--stream-flush-timeout=500ms",
+ "--measure-flush-timeout=500ms",
+ "--trace-flush-timeout=500ms",
+ "--stream-sync-interval=1s",
+ "--measure-sync-interval=1s",
+ "--trace-sync-interval=1s",
+ "--handoff-max-size-percent=100",
+ "--logging-modules", "trace,sidx",
+ "--logging-levels", "debug,debug",
+ }
+
+ l.closeFn = setup.CMD(args...)
+
+ Eventually(helpers.HTTPHealthCheck(l.httpAddr, ""),
flags.EventuallyTimeout).Should(Succeed())
+ Eventually(func() (map[string]struct{}, error) {
+ m, err := helpers.ListKeys(etcdEndpoint, l.etcdKey())
+ return keysToSet(m), err
+ }, flags.EventuallyTimeout).Should(HaveLen(1))
+}
+
+func (l *liaisonHandle) stop(etcdEndpoint string) {
+ if l.closeFn != nil {
+ l.closeFn()
+ l.closeFn = nil
+ }
+ _ = os.Unsetenv("BYDB_DATA_NODE_LIST")
+ _ = os.Unsetenv("BYDB_DATA_NODE_SELECTOR")
+ Eventually(func() (map[string]struct{}, error) {
+ m, err := helpers.ListKeys(etcdEndpoint, l.etcdKey())
+ return keysToSet(m), err
+ }, flags.EventuallyTimeout).Should(HaveLen(0))
+}
+
+func (l *liaisonHandle) etcdKey() string {
+ return fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace,
nodeHost, l.serverPort)
+}
+
+type suiteInfo struct {
+ LiaisonAddr string `json:"liaison_addr"`
+ EtcdEndpoint string `json:"etcd_endpoint"`
+ HandoffRoot string `json:"handoff_root"`
+ DataNodes []string `json:"data_nodes"`
+}
+
+func keysToSet[K comparable, V any](m map[K]V) map[K]struct{} {
+ if m == nil {
+ return nil
+ }
+ out := make(map[K]struct{}, len(m))
+ for k := range m {
+ out[k] = struct{}{}
+ }
+ return out
+}
+
+var (
+ connection *grpc.ClientConn
+ goods []gleak.Goroutine
+ cleanupFunc func()
+ handoffRoot string
+ etcdEndpoint string
+
+ dnHandles [2]*dataNodeHandle
+ liaison *liaisonHandle
+
+ etcdServer embeddedetcd.Server
+ etcdSpaceDef func()
+ liaisonDef func()
+ dataDefs []func()
+)
+
+func TestHandoff(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Trace Handoff Suite")
+}
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+ Expect(logger.Init(logger.Logging{Env: "dev", Level:
flags.LogLevel})).To(Succeed())
+ goods = gleak.Goroutines()
+
+ etcdPorts, err := test.AllocateFreePorts(2)
+ Expect(err).NotTo(HaveOccurred())
+
+ var etcdDir string
+ etcdDir, etcdSpaceDef, err = test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+
+ clientEP := fmt.Sprintf("http://%s:%d", nodeHost, etcdPorts[0])
+ peerEP := fmt.Sprintf("http://%s:%d", nodeHost, etcdPorts[1])
+
+ etcdServer, err = embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener([]string{clientEP},
[]string{peerEP}),
+ embeddedetcd.RootDir(etcdDir),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ <-etcdServer.ReadyNotify()
+ etcdEndpoint = clientEP
+
+ registry, err := schema.NewEtcdSchemaRegistry(
+ schema.Namespace(metadata.DefaultNamespace),
+ schema.ConfigureServerEndpoints([]string{clientEP}),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ defer registry.Close()
+ Expect(testtrace.PreloadSchema(context.Background(),
registry)).To(Succeed())
+
+ dataDefs = make([]func(), 0, 2)
+ for i := range dnHandles {
+ dataDir, def, errDir := test.NewSpace()
+ Expect(errDir).NotTo(HaveOccurred())
+ dataDefs = append(dataDefs, def)
+ ports, errPorts := test.AllocateFreePorts(2)
+ Expect(errPorts).NotTo(HaveOccurred())
+ dnHandles[i] = newDataNodeHandle(dataDir, ports[0], ports[1])
+ dnHandles[i].start(etcdEndpoint)
+ }
+
+ liaisonPath, def, err := test.NewSpace()
+ Expect(err).NotTo(HaveOccurred())
+ liaisonDef = def
+ liaisonPorts, err := test.AllocateFreePorts(3)
+ Expect(err).NotTo(HaveOccurred())
+ liaison = newLiaisonHandle(liaisonPath, liaisonPorts[0],
liaisonPorts[1], liaisonPorts[2])
+ nodeAddrs := []string{dnHandles[0].addr, dnHandles[1].addr}
+ liaison.start(etcdEndpoint, nodeAddrs)
+
+ handoffRoot = filepath.Join(liaisonPath, "trace", "data", "handoff",
"nodes")
+
+ cleanupFunc = func() {
+ if liaison != nil {
+ liaison.stop(etcdEndpoint)
+ }
+ for i := range dnHandles {
+ if dnHandles[i] != nil {
+ dnHandles[i].stop(etcdEndpoint)
+ }
+ }
+ if etcdServer != nil {
+ _ = etcdServer.Close()
+ }
+ if liaisonDef != nil {
+ liaisonDef()
+ }
+ for _, def := range dataDefs {
+ if def != nil {
+ def()
+ }
+ }
+ if etcdSpaceDef != nil {
+ etcdSpaceDef()
+ }
+ }
+
+ info := suiteInfo{
+ LiaisonAddr: liaison.addr,
+ EtcdEndpoint: etcdEndpoint,
+ HandoffRoot: handoffRoot,
+ DataNodes: nodeAddrs,
+ }
+ payload, err := json.Marshal(info)
+ Expect(err).NotTo(HaveOccurred())
+ return payload
+}, func(data []byte) {
+ var info suiteInfo
+ Expect(json.Unmarshal(data, &info)).To(Succeed())
+ etcdEndpoint = info.EtcdEndpoint
+ handoffRoot = info.HandoffRoot
+ if liaison == nil {
+ liaison = &liaisonHandle{addr: info.LiaisonAddr}
+ } else {
+ liaison.addr = info.LiaisonAddr
+ }
+
+ var err error
+ connection, err = grpchelper.Conn(info.LiaisonAddr, 10*time.Second,
+ grpc.WithTransportCredentials(insecure.NewCredentials()))
+ Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+ if connection != nil {
+ Expect(connection.Close()).To(Succeed())
+ }
+}, func() {})
+
+var _ = ReportAfterSuite("Trace Handoff Suite", func(report Report) {
+ if report.SuiteSucceeded {
+ if cleanupFunc != nil {
+ cleanupFunc()
+ }
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ }
+})
+
+var _ = Describe("trace handoff", func() {
+ It("replays data to recovered nodes and empties the queue", func() {
+ Expect(connection).NotTo(BeNil())
+
+ nodeAddrs := []string{dnHandles[0].addr, dnHandles[1].addr}
+ var (
+ targetIndex = -1
+ traceID string
+ writeTime time.Time
+ targetShard uint32
+ )
+
+ for idx := range dnHandles {
+ candidateShard := shardIDForNode(dnHandles[idx].addr,
nodeAddrs)
+ candidateTraceID :=
generateTraceIDForShard(candidateShard, traceShardCount)
+
+ By(fmt.Sprintf("ensuring the handoff queue for %s
starts empty", dnHandles[idx].addr))
+
Expect(countPendingParts(dnHandles[idx].addr)).To(Equal(0))
+
+ By(fmt.Sprintf("stopping %s for shard %d",
dnHandles[idx].addr, candidateShard))
+ dnHandles[idx].stop(etcdEndpoint)
+ time.Sleep(7 * time.Second)
Review Comment:
Hardcoded 7-second sleep with no explanation for why this specific duration
is needed. This makes the test brittle and slow. Consider using a configurable
timeout or polling mechanism with a comment explaining what condition is being
waited for (e.g., 'wait for node to be detected as offline').
```suggestion
// Wait for the node to be fully stopped/offline before
proceeding.
// This avoids brittle, slow tests due to hardcoded
sleeps.
Eventually(func() bool {
// Replace this with a real check if available,
e.g., node is offline or not responding.
// For now, we check that the node's handoff
queue is not accessible (returns error or -1).
return countPendingParts(dnHandles[idx].addr)
== -1
}, flags.EventuallyTimeout,
500*time.Millisecond).Should(BeTrue())
```
##########
banyand/trace/handoff_controller.go:
##########
@@ -0,0 +1,1261 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/api/data"
+ "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bytes"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// handoffController manages handoff queues for multiple data nodes.
+type handoffController struct {
+ fileSystem fs.FileSystem
+ tire2Client queueClient
+ resolveShardAssignments func(group string, shardID uint32) ([]string,
error)
+ inFlightSends map[string]map[uint64]struct{}
+ l *logger.Logger
+ replayTriggerChan chan string
+ nodeQueues map[string]*handoffNodeQueue
+ replayStopChan chan struct{}
+ healthyNodes map[string]struct{}
+ statusChangeChan chan nodeStatusChange
+ stopMonitor chan struct{}
+ root string
+ allDataNodes []string
+ monitorWg sync.WaitGroup
+ replayWg sync.WaitGroup
+ checkInterval time.Duration
+ replayBatchSize int
+ replayPollInterval time.Duration
+ maxTotalSizeBytes uint64
+ currentTotalSize uint64
+ mu sync.RWMutex
+ inFlightMu sync.RWMutex
+ sizeMu sync.RWMutex
+}
+
+// nodeStatusChange represents a node status transition.
+type nodeStatusChange struct {
+ nodeName string
+ isOnline bool
+}
+
+// queueClient interface combines health checking and sync client creation.
+type queueClient interface {
+ HealthyNodes() []string
+ NewChunkedSyncClient(node string, chunkSize uint32)
(queue.ChunkedSyncClient, error)
+}
+
+// newHandoffController creates a new handoff controller.
+func newHandoffController(fileSystem fs.FileSystem, root string, tire2Client
queueClient,
+ dataNodeList []string, maxSize int, l *logger.Logger,
+ resolveShardAssignments func(group string, shardID uint32) ([]string,
error),
+) (*handoffController, error) {
+ if fileSystem == nil {
+ return nil, fmt.Errorf("fileSystem is nil")
+ }
+ if l == nil {
+ return nil, fmt.Errorf("logger is nil")
+ }
+ if root == "" {
+ return nil, fmt.Errorf("root path is empty")
+ }
+
+ handoffRoot := filepath.Join(root, "handoff", "nodes")
+
+ hc := &handoffController{
+ l: l,
+ fileSystem: fileSystem,
+ nodeQueues: make(map[string]*handoffNodeQueue),
+ root: handoffRoot,
+ tire2Client: tire2Client,
+ allDataNodes: dataNodeList,
+ healthyNodes: make(map[string]struct{}),
+ statusChangeChan: make(chan nodeStatusChange, 100),
+ stopMonitor: make(chan struct{}),
+ checkInterval: 5 * time.Second,
+ replayStopChan: make(chan struct{}),
+ replayTriggerChan: make(chan string, 100),
+ inFlightSends: make(map[string]map[uint64]struct{}),
+ replayBatchSize: 10,
+ replayPollInterval: 1 * time.Second,
+ maxTotalSizeBytes: uint64(maxSize),
+ currentTotalSize: 0,
+ resolveShardAssignments: resolveShardAssignments,
+ }
+
+ // Create handoff root directory if it doesn't exist
+ fileSystem.MkdirIfNotExist(handoffRoot, storage.DirPerm)
+
+ // Load existing node queues from disk
+ if err := hc.loadExistingQueues(); err != nil {
+ l.Warn().Err(err).Msg("failed to load existing handoff queues")
+ }
+
+ // Start the node status monitor
+ hc.startMonitor()
+
+ // Start the replay worker
+ hc.startReplayWorker()
+
+ return hc, nil
+}
+
+// loadExistingQueues scans the handoff directory and loads existing node
queues.
+func (hc *handoffController) loadExistingQueues() error {
+ if hc == nil || hc.fileSystem == nil {
+ return fmt.Errorf("handoff controller is not initialized")
+ }
+
+ entries := hc.fileSystem.ReadDir(hc.root)
+ var totalRecoveredSize uint64
+ var errs []error
+
+ for _, entry := range entries {
+ if !entry.IsDir() {
+ continue
+ }
+
+ nodeRoot := filepath.Join(hc.root, entry.Name())
+
+ // Read the original node address from .node_info file
+ nodeAddr, err := readNodeInfo(hc.fileSystem, nodeRoot)
+ if err != nil {
+ hc.l.Warn().Err(err).Str("dir",
entry.Name()).Msg("failed to read node info, skipping")
+ errs = append(errs, fmt.Errorf("read node info for %s:
%w", entry.Name(), err))
+ continue
+ }
+
+ nodeQueue, err := newHandoffNodeQueue(nodeAddr, nodeRoot,
hc.fileSystem, hc.l)
+ if err != nil {
+ hc.l.Warn().Err(err).Str("node", nodeAddr).Msg("failed
to load node queue")
+ errs = append(errs, fmt.Errorf("load node queue for %s:
%w", nodeAddr, err))
+ continue
+ }
+
+ hc.nodeQueues[nodeAddr] = nodeQueue
+
+ // Calculate queue size from metadata (not from actual file
sizes)
+ pending, err := nodeQueue.listPending()
+ if err != nil {
+ hc.l.Warn().Err(err).Str("node", nodeAddr).Msg("failed
to list pending parts")
+ errs = append(errs, fmt.Errorf("list pending for %s:
%w", nodeAddr, err))
+ continue
+ }
+ var queueSize uint64
+ for _, ptp := range pending {
+ meta, err := nodeQueue.getMetadata(ptp.PartID,
ptp.PartType)
+ if err != nil {
+ hc.l.Warn().Err(err).
+ Str("node", nodeAddr).
+ Uint64("partID", ptp.PartID).
+ Str("partType", ptp.PartType).
+ Msg("failed to read part metadata")
+ errs = append(errs, fmt.Errorf("metadata for
node %s part %x (%s): %w",
+ nodeAddr, ptp.PartID, ptp.PartType,
err))
+ continue
+ }
+ if meta.PartSizeBytes > 0 {
+ queueSize += meta.PartSizeBytes
+ }
+ }
+ totalRecoveredSize += queueSize
+
+ // Log pending parts count
+ if len(pending) > 0 {
+ hc.l.Info().
+ Str("node", nodeAddr).
+ Int("pending", len(pending)).
+ Uint64("sizeMB", queueSize/1024/1024).
+ Msg("loaded handoff queue with pending parts")
+ }
+ }
+
+ // Update current total size
+ hc.currentTotalSize = totalRecoveredSize
+ if totalRecoveredSize > 0 {
+ hc.l.Info().
+ Uint64("totalSizeMB", totalRecoveredSize/1024/1024).
+ Int("nodeCount", len(hc.nodeQueues)).
+ Msg("recovered handoff queue state")
+ }
+ if len(errs) > 0 {
+ return errors.Join(errs...)
+ }
+
+ return nil
+}
+
+// enqueueForNode adds a part to the handoff queue for a specific node.
+func (hc *handoffController) enqueueForNode(nodeAddr string, partID uint64,
partType string, sourcePath string,
+ group string, shardID uint32,
+) error {
+ // Read part size from metadata
+ partSize := hc.readPartSizeFromMetadata(sourcePath, partType)
+
+ // Check if enqueue would exceed limit
+ if !hc.canEnqueue(partSize) {
+ currentSize := hc.getTotalSize()
+ return fmt.Errorf("handoff queue full: current=%d MB, limit=%d
MB, part=%d MB",
+ currentSize/1024/1024, hc.maxTotalSizeBytes/1024/1024,
partSize/1024/1024)
+ }
+
+ hc.mu.Lock()
+ defer hc.mu.Unlock()
+
+ meta := &handoffMetadata{
+ EnqueueTimestamp: time.Now().UnixNano(),
+ Group: group,
+ ShardID: shardID,
+ PartType: partType,
+ PartSizeBytes: partSize,
+ }
+
+ nodeQueue, err := hc.getOrCreateNodeQueue(nodeAddr)
+ if err != nil {
+ return fmt.Errorf("failed to get node queue for %s: %w",
nodeAddr, err)
+ }
+
+ if err := nodeQueue.enqueue(partID, partType, sourcePath, meta); err !=
nil {
+ return err
+ }
+
+ // Update total size after successful enqueue
+ hc.updateTotalSize(int64(partSize))
+
+ return nil
+}
+
+// enqueueForNodes adds a part to the handoff queues for multiple offline
nodes.
+func (hc *handoffController) enqueueForNodes(offlineNodes []string, partID
uint64, partType string, sourcePath string,
+ group string, shardID uint32,
+) error {
+ meta := &handoffMetadata{
+ EnqueueTimestamp: time.Now().UnixNano(),
+ Group: group,
+ ShardID: shardID,
+ PartType: partType,
+ }
+
+ hc.mu.Lock()
+ defer hc.mu.Unlock()
+
+ var firstErr error
+ successCount := 0
+
+ // For each offline node, create hard-linked copy
+ for _, nodeAddr := range offlineNodes {
+ nodeQueue, err := hc.getOrCreateNodeQueue(nodeAddr)
+ if err != nil {
+ hc.l.Error().Err(err).Str("node", nodeAddr).Msg("failed
to get node queue")
+ if firstErr == nil {
+ firstErr = err
+ }
+ continue
+ }
+
+ if err := nodeQueue.enqueue(partID, partType, sourcePath,
meta); err != nil {
+ hc.l.Error().Err(err).Str("node",
nodeAddr).Uint64("partId", partID).Str("partType", partType).
+ Msg("failed to enqueue part")
+ if firstErr == nil {
+ firstErr = err
+ }
+ continue
+ }
+
+ successCount++
+ }
+
+ if successCount > 0 {
+ hc.l.Info().
+ Int("successCount", successCount).
+ Int("totalNodes", len(offlineNodes)).
+ Uint64("partId", partID).
+ Str("partType", partType).
+ Msg("part enqueued to handoff queues")
+ }
+
+ // Return error only if all enqueues failed
+ if successCount == 0 && firstErr != nil {
+ return firstErr
+ }
+
+ return nil
+}
+
+// getOrCreateNodeQueue gets an existing node queue or creates a new one.
+// Caller must hold hc.mu lock.
+func (hc *handoffController) getOrCreateNodeQueue(nodeAddr string)
(*handoffNodeQueue, error) {
+ // Check if queue already exists
+ if queue, exists := hc.nodeQueues[nodeAddr]; exists {
+ return queue, nil
+ }
+
+ // Create new queue
+ sanitizedAddr := sanitizeNodeAddr(nodeAddr)
+ nodeRoot := filepath.Join(hc.root, sanitizedAddr)
+
+ nodeQueue, err := newHandoffNodeQueue(nodeAddr, nodeRoot,
hc.fileSystem, hc.l)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create node queue: %w", err)
+ }
+
+ hc.nodeQueues[nodeAddr] = nodeQueue
+ return nodeQueue, nil
+}
+
+// listPendingForNode returns all pending parts with their types for a
specific node.
+func (hc *handoffController) listPendingForNode(nodeAddr string)
([]partTypePair, error) {
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+
+ nodeQueue, exists := hc.nodeQueues[nodeAddr]
+ if !exists {
+ return nil, nil // No queue means no pending parts
+ }
+
+ return nodeQueue.listPending()
+}
+
+// getPartPath returns the path to a specific part type directory in a node's
handoff queue.
+func (hc *handoffController) getPartPath(nodeAddr string, partID uint64,
partType string) string {
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+
+ nodeQueue, exists := hc.nodeQueues[nodeAddr]
+ if !exists {
+ return ""
+ }
+
+ return nodeQueue.getPartTypePath(partID, partType)
+}
+
+// getPartMetadata returns the handoff metadata for a specific part type.
+func (hc *handoffController) getPartMetadata(nodeAddr string, partID uint64,
partType string) (*handoffMetadata, error) {
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+
+ nodeQueue, exists := hc.nodeQueues[nodeAddr]
+ if !exists {
+ return nil, fmt.Errorf("node queue not found for %s", nodeAddr)
+ }
+
+ return nodeQueue.getMetadata(partID, partType)
+}
+
+// completeSend removes a specific part type from a node's handoff queue after
successful delivery.
+func (hc *handoffController) completeSend(nodeAddr string, partID uint64,
partType string) error {
+ // Get part size before removing
+ var partSize uint64
+ meta, err := hc.getPartMetadata(nodeAddr, partID, partType)
+ if err == nil && meta.PartSizeBytes > 0 {
+ partSize = meta.PartSizeBytes
+ }
+
+ hc.mu.RLock()
+ nodeQueue, exists := hc.nodeQueues[nodeAddr]
+ hc.mu.RUnlock()
+
+ if !exists {
+ return fmt.Errorf("node queue not found for %s", nodeAddr)
+ }
+
+ if err := nodeQueue.complete(partID, partType); err != nil {
+ return err
+ }
+
+ // Update total size after successful removal
+ if partSize > 0 {
+ hc.updateTotalSize(-int64(partSize))
+ }
+
+ return nil
+}
+
+// completeSendAll removes all part types for a given partID from a node's
handoff queue.
+func (hc *handoffController) completeSendAll(nodeAddr string, partID uint64)
error {
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+
+ nodeQueue, exists := hc.nodeQueues[nodeAddr]
+ if !exists {
+ return fmt.Errorf("node queue not found for %s", nodeAddr)
+ }
+
+ return nodeQueue.completeAll(partID)
+}
+
+// getNodeQueueSize returns the total size of pending parts for a specific
node.
+func (hc *handoffController) getNodeQueueSize(nodeAddr string) (uint64, error)
{
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+
+ nodeQueue, exists := hc.nodeQueues[nodeAddr]
+ if !exists {
+ return 0, nil
+ }
+
+ return nodeQueue.size()
+}
+
+// getAllNodeQueues returns a snapshot of all node addresses with handoff
queues.
+func (hc *handoffController) getAllNodeQueues() []string {
+ hc.mu.RLock()
+ defer hc.mu.RUnlock()
+
+ nodes := make([]string, 0, len(hc.nodeQueues))
+ for nodeAddr := range hc.nodeQueues {
+ nodes = append(nodes, nodeAddr)
+ }
+
+ return nodes
+}
+
+// partInfo contains information about a part to be enqueued.
+type partInfo struct {
+ path string
+ group string
+ partID uint64
+ shardID common.ShardID
+}
+
+// calculateOfflineNodes returns the list of offline nodes responsible for the
shard.
+func (hc *handoffController) calculateOfflineNodes(onlineNodes []string, group
string, shardID common.ShardID) []string {
+ if hc == nil {
+ return nil
+ }
+
+ onlineSet := make(map[string]struct{}, len(onlineNodes))
+ for _, node := range onlineNodes {
+ onlineSet[node] = struct{}{}
+ }
+
+ candidates := hc.nodesForShard(group, uint32(shardID))
+ seen := make(map[string]struct{}, len(candidates))
+ var offlineNodes []string
+ for _, node := range candidates {
+ if _, dup := seen[node]; dup {
+ continue
+ }
+ seen[node] = struct{}{}
+ if !hc.isNodeHealthy(node) {
+ offlineNodes = append(offlineNodes, node)
+ continue
+ }
+ if len(onlineSet) > 0 {
+ if _, isOnline := onlineSet[node]; !isOnline {
+ offlineNodes = append(offlineNodes, node)
+ }
+ continue
+ }
+ if hc.tire2Client == nil {
+ offlineNodes = append(offlineNodes, node)
+ }
+ }
+
+ return offlineNodes
+}
+
+// enqueueForOfflineNodes enqueues the provided core and sidx parts for each
offline node.
+func (hc *handoffController) enqueueForOfflineNodes(
+ offlineNodes []string,
+ coreParts []partInfo,
+ sidxParts map[string][]partInfo,
+) error {
+ if hc == nil || len(offlineNodes) == 0 {
+ return nil
+ }
+
+ group, shardID, hasShardInfo := extractShardDetails(coreParts,
sidxParts)
+ if hasShardInfo {
+ filtered := hc.filterNodesForShard(offlineNodes, group, shardID)
+ if len(filtered) == 0 {
+ hc.l.Debug().
+ Str("group", group).
+ Uint32("shardID", shardID).
+ Msg("no offline shard owners to enqueue")
+ return nil
+ }
+ offlineNodes = filtered
+ }
+
+ // Track enqueue statistics
+ totalCoreEnqueued := 0
+ totalSidxEnqueued := 0
+ var lastErr error
+
+ // For each offline node, enqueue all parts
+ for _, nodeAddr := range offlineNodes {
+ // Enqueue core parts
+ for _, coreInfo := range coreParts {
+ err := hc.enqueueForNode(nodeAddr, coreInfo.partID,
PartTypeCore, coreInfo.path, coreInfo.group, uint32(coreInfo.shardID))
+ if err != nil {
+ hc.l.Warn().Err(err).
+ Str("node", nodeAddr).
+ Uint64("partID", coreInfo.partID).
+ Msg("failed to enqueue core part")
+ lastErr = err
+ } else {
+ totalCoreEnqueued++
+ }
+ }
+
+ // Enqueue sidx parts
+ for sidxName, parts := range sidxParts {
+ for _, sidxInfo := range parts {
+ err := hc.enqueueForNode(nodeAddr,
sidxInfo.partID, sidxName, sidxInfo.path, sidxInfo.group,
uint32(sidxInfo.shardID))
+ if err != nil {
+ hc.l.Warn().Err(err).
+ Str("node", nodeAddr).
+ Str("sidx", sidxName).
+ Uint64("partID",
sidxInfo.partID).
+ Msg("failed to enqueue sidx
part")
+ lastErr = err
+ } else {
+ totalSidxEnqueued++
+ }
+ }
+ }
+ }
+
+ // Log summary
+ hc.l.Info().
+ Int("offlineNodes", len(offlineNodes)).
+ Str("group", group).
+ Uint32("shardID", shardID).
+ Int("corePartsEnqueued", totalCoreEnqueued).
+ Int("sidxPartsEnqueued", totalSidxEnqueued).
+ Msg("enqueued parts for offline nodes")
+
+ return lastErr
+}
+
+func extractShardDetails(coreParts []partInfo, sidxParts
map[string][]partInfo) (string, uint32, bool) {
+ if len(coreParts) > 0 {
+ return coreParts[0].group, uint32(coreParts[0].shardID), true
+ }
+ for _, parts := range sidxParts {
+ if len(parts) == 0 {
+ continue
+ }
+ return parts[0].group, uint32(parts[0].shardID), true
+ }
+ return "", 0, false
+}
+
+func (hc *handoffController) nodesForShard(group string, shardID uint32)
[]string {
+ if hc == nil {
+ return nil
+ }
+ if hc.resolveShardAssignments == nil {
+ return hc.allDataNodes
+ }
+ nodes, err := hc.resolveShardAssignments(group, shardID)
+ if err != nil {
+ if hc.l != nil {
+ hc.l.Warn().
+ Err(err).
+ Str("group", group).
+ Uint32("shardID", shardID).
+ Msg("failed to resolve shard assignments, using
configured node list")
+ }
+ return hc.allDataNodes
+ }
+ if len(nodes) == 0 {
+ return hc.allDataNodes
+ }
+ return nodes
+}
+
+func (hc *handoffController) filterNodesForShard(nodes []string, group string,
shardID uint32) []string {
+ candidates := hc.nodesForShard(group, shardID)
+ if len(candidates) == 0 {
+ return nil
+ }
+ candidateSet := make(map[string]struct{}, len(candidates))
+ for _, node := range candidates {
+ candidateSet[node] = struct{}{}
+ }
+ filtered := make([]string, 0, len(nodes))
+ seen := make(map[string]struct{}, len(nodes))
+ for _, node := range nodes {
+ if _, already := seen[node]; already {
+ continue
+ }
+ seen[node] = struct{}{}
+ if _, ok := candidateSet[node]; ok {
+ filtered = append(filtered, node)
+ }
+ }
+ return filtered
+}
+
+// close closes the handoff controller.
+func (hc *handoffController) close() error {
+ // Stop the monitor
+ if hc.stopMonitor != nil {
+ close(hc.stopMonitor)
+ hc.monitorWg.Wait()
+ }
+
+ // Stop the replay worker
+ if hc.replayStopChan != nil {
+ close(hc.replayStopChan)
+ hc.replayWg.Wait()
+ }
+
+ hc.mu.Lock()
+ defer hc.mu.Unlock()
+
+ // Clear node queues
+ hc.nodeQueues = nil
+
+ // Clear in-flight tracking
+ hc.inFlightSends = nil
+
+ return nil
+}
+
+// getTotalSize returns the current total size across all node queues.
+func (hc *handoffController) getTotalSize() uint64 {
+ hc.sizeMu.RLock()
+ defer hc.sizeMu.RUnlock()
+ return hc.currentTotalSize
+}
+
+// canEnqueue checks if adding a part of the given size would exceed the total
size limit.
+func (hc *handoffController) canEnqueue(partSize uint64) bool {
+ if hc.maxTotalSizeBytes == 0 {
+ return true // No limit configured
+ }
+
+ hc.sizeMu.RLock()
+ defer hc.sizeMu.RUnlock()
+ return hc.currentTotalSize+partSize <= hc.maxTotalSizeBytes
+}
+
+// readPartSizeFromMetadata reads the CompressedSizeBytes from the part's
metadata file.
+func (hc *handoffController) readPartSizeFromMetadata(sourcePath, partType
string) uint64 {
+ var metadataPath string
+
+ // Core parts use metadata.json, sidx parts use manifest.json
+ if partType == PartTypeCore {
+ metadataPath = filepath.Join(sourcePath, metadataFilename) //
"metadata.json"
+ } else {
+ metadataPath = filepath.Join(sourcePath, "manifest.json")
+ }
+
+ // Read metadata file
+ data, err := hc.fileSystem.Read(metadataPath)
+ if err != nil {
+ hc.l.Warn().Err(err).Str("path", metadataPath).Msg("failed to
read metadata file")
+ return 0
+ }
+
+ // Parse metadata to get CompressedSizeBytes
+ var metadata struct {
+ CompressedSizeBytes uint64 `json:"compressedSizeBytes"`
+ }
+ if err := json.Unmarshal(data, &metadata); err != nil {
+ hc.l.Warn().Err(err).Str("path", metadataPath).Msg("failed to
parse metadata")
+ return 0
+ }
+
+ return metadata.CompressedSizeBytes
+}
+
+// updateTotalSize atomically updates the current total size.
+func (hc *handoffController) updateTotalSize(delta int64) {
+ hc.sizeMu.Lock()
+ defer hc.sizeMu.Unlock()
+
+ if delta > 0 {
+ // Enqueue: add to total
+ hc.currentTotalSize += uint64(delta)
+ } else if delta < 0 {
+ // Complete: subtract from total with underflow check
+ toSubtract := uint64(-delta)
+ if toSubtract > hc.currentTotalSize {
+ hc.l.Warn().
+ Uint64("current", hc.currentTotalSize).
+ Uint64("toSubtract", toSubtract).
+ Msg("attempted to subtract more than current
size, resetting to 0")
+ hc.currentTotalSize = 0
+ } else {
+ hc.currentTotalSize -= toSubtract
+ }
+ }
Review Comment:
The underflow warning and reset to 0 masks a potential bug in the size
tracking logic. If `toSubtract > currentTotalSize`, this indicates parts were
completed without being properly tracked during enqueue, or double-completion
occurred. Consider investigating the root cause and potentially returning an
error instead of silently resetting to prevent data inconsistencies.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]