This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug-hq-sync
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 3d499c5fb22e7baf36c94132908d970a38198b44
Author: Hongtao Gao <[email protected]>
AuthorDate: Sun Apr 12 14:26:40 2026 +0000

    fix(handoff): use shared LocateAll to prevent enqueuing parts for online 
nodes
    
    ResolveAssignments and syncer's GetNodes were independently computing
    node lists, causing them to diverge due to different lookup table state.
    Extract LocateAll on NodeRegistry so both paths use the same selector,
    eliminating spurious handoff enqueues for healthy data nodes.
    
    via [HAPI](https://hapi.run)
    
    Co-Authored-By: HAPI <[email protected]>
---
 CHANGES.md                            |  1 +
 banyand/liaison/grpc/node.go          | 24 +++++++++++++
 banyand/liaison/grpc/node_test.go     | 11 ++++++
 banyand/measure/metadata.go           | 16 +++------
 banyand/stream/metadata.go            | 16 +++------
 banyand/trace/handoff_controller.go   | 10 +++---
 banyand/trace/handoff_storage_test.go | 64 +++++++++++++++++++++++++++++++++++
 banyand/trace/metadata.go             | 16 +++------
 banyand/trace/svc_liaison.go          | 18 +---------
 9 files changed, 117 insertions(+), 59 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index dd25bc75f..be447f6eb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -32,6 +32,7 @@ Release Notes.
 - Fix SIDX streaming sync sending SegmentID as MinTimestamp instead of the 
actual timestamp, causing sync failures on the receiving node.
 - Fix handoff controller TOCTOU race allowing disk size limit bypass, and 
populate sidx MinTimestamp/MaxTimestamp during replay to prevent corrupt 
segment creation on recovered nodes.
 - Delete orphaned parts when no snapshot references them during tsTable 
initialization.
+- Extract shared LocateAll on NodeRegistry to ensure resolveAssignments and 
syncer GetNodes always produce identical node lists, preventing liaison from 
enqueuing parts to online/healthy data nodes.
 
 ### Chores
 
diff --git a/banyand/liaison/grpc/node.go b/banyand/liaison/grpc/node.go
index 608799735..1c9379b3e 100644
--- a/banyand/liaison/grpc/node.go
+++ b/banyand/liaison/grpc/node.go
@@ -39,7 +39,10 @@ var (
 // NodeRegistry is for locating data node with group/name of the metadata
 // together with the shardID calculated from the incoming data.
 type NodeRegistry interface {
+       // Locate returns the data node assigned to the given (group, name, 
shardID, replicaID).
        Locate(group, name string, shardID, replicaID uint32) (string, error)
+       // LocateAll returns all distinct data nodes owning the given shard 
across all replicas.
+       LocateAll(group string, shardID uint32, replicas int) ([]string, error)
        fmt.Stringer
 }
 
@@ -74,6 +77,22 @@ func (n *clusterNodeService) Locate(group, name string, 
shardID, replicaID uint3
        return nodeID, nil
 }
 
+func (n *clusterNodeService) LocateAll(group string, shardID uint32, replicas 
int) ([]string, error) {
+       nodeSet := make(map[string]struct{}, replicas)
+       for replica := 0; replica < replicas; replica++ {
+               nodeID, err := n.Locate(group, "", shardID, uint32(replica))
+               if err != nil {
+                       return nil, errors.Wrapf(err, "fail to locate 
%s/%d/%d", group, shardID, replica)
+               }
+               nodeSet[nodeID] = struct{}{}
+       }
+       nodes := make([]string, 0, len(nodeSet))
+       for nodeID := range nodeSet {
+               nodes = append(nodes, nodeID)
+       }
+       return nodes, nil
+}
+
 func (n *clusterNodeService) OnAddOrUpdate(metadata schema.Metadata) {
        switch metadata.Kind {
        case schema.KindNode:
@@ -117,3 +136,8 @@ func NewLocalNodeRegistry() NodeRegistry {
 func (localNodeService) Locate(_, _ string, _, _ uint32) (string, error) {
        return "local", nil
 }
+
+// LocateAll of localNodeService always returns [local].
+func (localNodeService) LocateAll(_ string, _ uint32, _ int) ([]string, error) 
{
+       return []string{"local"}, nil
+}
diff --git a/banyand/liaison/grpc/node_test.go 
b/banyand/liaison/grpc/node_test.go
index 999d78d14..89c11946c 100644
--- a/banyand/liaison/grpc/node_test.go
+++ b/banyand/liaison/grpc/node_test.go
@@ -58,4 +58,15 @@ func TestClusterNodeRegistry(t *testing.T) {
        nodeID, err := cnr.Locate("metrics", "instance_traffic", 0, 0)
        assert.NoError(t, err)
        assert.Equal(t, fakeNodeID, nodeID)
+
+       nodes, locateAllErr := cnr.LocateAll("metrics", 0, 3)
+       assert.NoError(t, locateAllErr)
+       assert.Equal(t, []string{fakeNodeID}, nodes)
+}
+
+func TestLocalNodeRegistry_LocateAll(t *testing.T) {
+       nr := NewLocalNodeRegistry()
+       nodes, err := nr.LocateAll("any-group", 0, 3)
+       assert.NoError(t, err)
+       assert.Equal(t, []string{"local"}, nodes)
 }
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 85d1e701f..f14b87416 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -816,18 +816,10 @@ func (s *queueSupplier) OpenDB(groupSchema 
*commonv1.Group) (resourceSchema.DB,
                SubQueueCreator: newWriteQueue,
                GetNodes: func(shardID common.ShardID) []string {
                        copies := ro.Replicas + 1
-                       nodeSet := make(map[string]struct{}, copies)
-                       for i := uint32(0); i < copies; i++ {
-                               nodeID, err := 
s.measureDataNodeRegistry.Locate(group, "", uint32(shardID), i)
-                               if err != nil {
-                                       s.l.Error().Err(err).Str("group", 
group).Uint32("shard", uint32(shardID)).Uint32("copy", i).Msg("failed to locate 
node")
-                                       return nil
-                               }
-                               nodeSet[nodeID] = struct{}{}
-                       }
-                       nodes := make([]string, 0, len(nodeSet))
-                       for nodeID := range nodeSet {
-                               nodes = append(nodes, nodeID)
+                       nodes, err := 
s.measureDataNodeRegistry.LocateAll(group, uint32(shardID), int(copies))
+                       if err != nil {
+                               s.l.Error().Err(err).Str("group", 
group).Uint32("shard", uint32(shardID)).Msg("failed to locate nodes")
+                               return nil
                        }
                        return nodes
                },
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index e7bbe3c82..269104023 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -667,18 +667,10 @@ func (s *queueSupplier) OpenDB(groupSchema 
*commonv1.Group) (resourceSchema.DB,
                SubQueueCreator: newWriteQueue,
                GetNodes: func(shardID common.ShardID) []string {
                        copies := ro.Replicas + 1
-                       nodeSet := make(map[string]struct{}, copies)
-                       for i := uint32(0); i < copies; i++ {
-                               nodeID, err := 
s.streamDataNodeRegistry.Locate(group, "", uint32(shardID), i)
-                               if err != nil {
-                                       s.l.Error().Err(err).Str("group", 
group).Uint32("shard", uint32(shardID)).Uint32("copy", i).Msg("failed to locate 
node")
-                                       return nil
-                               }
-                               nodeSet[nodeID] = struct{}{}
-                       }
-                       nodes := make([]string, 0, len(nodeSet))
-                       for nodeID := range nodeSet {
-                               nodes = append(nodes, nodeID)
+                       nodes, err := s.streamDataNodeRegistry.LocateAll(group, 
uint32(shardID), int(copies))
+                       if err != nil {
+                               s.l.Error().Err(err).Str("group", 
group).Uint32("shard", uint32(shardID)).Msg("failed to locate nodes")
+                               return nil
                        }
                        return nodes
                },
diff --git a/banyand/trace/handoff_controller.go 
b/banyand/trace/handoff_controller.go
index 4e69f7559..bc6f176f2 100644
--- a/banyand/trace/handoff_controller.go
+++ b/banyand/trace/handoff_controller.go
@@ -467,14 +467,12 @@ func (hc *handoffController) 
calculateOfflineNodes(onlineNodes []string, group s
                        continue
                }
                seen[node] = struct{}{}
-               if !hc.isNodeHealthy(node) {
-                       offlineNodes = append(offlineNodes, node)
+               // Online nodes are delivered by the syncer; only enqueue truly 
offline nodes.
+               if _, isOnline := onlineSet[node]; isOnline {
                        continue
                }
-               if len(onlineSet) > 0 {
-                       if _, isOnline := onlineSet[node]; !isOnline {
-                               offlineNodes = append(offlineNodes, node)
-                       }
+               if !hc.isNodeHealthy(node) {
+                       offlineNodes = append(offlineNodes, node)
                        continue
                }
                if hc.tire2Client == nil {
diff --git a/banyand/trace/handoff_storage_test.go 
b/banyand/trace/handoff_storage_test.go
index 4ca00ca8f..c9f35b1ed 100644
--- a/banyand/trace/handoff_storage_test.go
+++ b/banyand/trace/handoff_storage_test.go
@@ -627,6 +627,70 @@ func TestHandoffController_FiltersNonOwningOfflineNodes(t 
*testing.T) {
        tester.True(os.IsNotExist(statErr), "expected no queue directory to be 
created for non-owning node")
 }
 
+func TestHandoffController_OnlineNodeSkipsStaleHealthCheck(t *testing.T) {
+       tester := require.New(t)
+       tempDir, deferFunc := test.Space(tester)
+       defer deferFunc()
+
+       lfs := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       dataNodes := []string{"node1:17912", "node2:17912"}
+       const groupName = "group1"
+       const shardID uint32 = 0
+
+       resolver := func(string, uint32) ([]string, error) {
+               return dataNodes, nil
+       }
+
+       // Health check reports only node1 (simulating stale health after node2 
restart).
+       queueClient := &fakeQueueClient{healthy: []string{"node1:17912"}}
+       hc, err := newHandoffController(lfs, tempDir, queueClient, dataNodes, 
0, l, resolver)
+       tester.NoError(err)
+       defer hc.close()
+
+       // Both nodes are in the syncer's target list, but health check is 
stale for node2.
+       offline := hc.calculateOfflineNodes(
+               []string{"node1:17912", "node2:17912"},
+               groupName,
+               common.ShardID(shardID),
+       )
+       tester.Len(offline, 0,
+               "expected no offline nodes when both are in the syncer target 
list, even with stale health check")
+}
+
+// TestHandoffController_OfflineNodeNotInOnlineSet verifies that a node NOT in 
the syncer's
+// target list and reported unhealthy is correctly classified as offline.
+func TestHandoffController_OfflineNodeNotInOnlineSet(t *testing.T) {
+       tester := require.New(t)
+       tempDir, deferFunc := test.Space(tester)
+       defer deferFunc()
+
+       lfs := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       dataNodes := []string{"node1:17912", "node2:17912"}
+       const groupName = "group1"
+       const shardID uint32 = 0
+
+       resolver := func(string, uint32) ([]string, error) {
+               return dataNodes, nil
+       }
+
+       // Health check reports only node1 — node2 is offline.
+       queueClient := &fakeQueueClient{healthy: []string{"node1:17912"}}
+       hc, err := newHandoffController(lfs, tempDir, queueClient, dataNodes, 
0, l, resolver)
+       tester.NoError(err)
+       defer hc.close()
+
+       // Only node1 is in the syncer's target list (simulating registry 
before AddNode).
+       offline := hc.calculateOfflineNodes(
+               []string{"node1:17912"},
+               groupName,
+               common.ShardID(shardID),
+       )
+       tester.Len(offline, 1)
+       tester.Equal("node2:17912", offline[0])
+}
+
 // TestHandoffController_SizeRecovery verifies that total size is correctly 
calculated on restart.
 func TestHandoffController_SizeRecovery(t *testing.T) {
        tester := require.New(t)
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index 5f4e0fc17..3e0249e4c 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -692,18 +692,10 @@ func (qs *queueSupplier) OpenDB(groupSchema 
*commonv1.Group) (resourceSchema.DB,
                },
                GetNodes: func(shardID common.ShardID) []string {
                        copies := ro.Replicas + 1
-                       nodeSet := make(map[string]struct{}, copies)
-                       for i := uint32(0); i < copies; i++ {
-                               nodeID, err := 
qs.traceDataNodeRegistry.Locate(group, "", uint32(shardID), i)
-                               if err != nil {
-                                       qs.l.Error().Err(err).Str("group", 
group).Uint32("shard", uint32(shardID)).Uint32("copy", i).Msg("failed to locate 
node")
-                                       return nil
-                               }
-                               nodeSet[nodeID] = struct{}{}
-                       }
-                       nodes := make([]string, 0, len(nodeSet))
-                       for nodeID := range nodeSet {
-                               nodes = append(nodes, nodeID)
+                       nodes, err := qs.traceDataNodeRegistry.LocateAll(group, 
uint32(shardID), int(copies))
+                       if err != nil {
+                               qs.l.Error().Err(err).Str("group", 
group).Uint32("shard", uint32(shardID)).Msg("failed to locate nodes")
+                               return nil
                        }
                        return nodes
                },
diff --git a/banyand/trace/svc_liaison.go b/banyand/trace/svc_liaison.go
index 5532ce011..3613b6c48 100644
--- a/banyand/trace/svc_liaison.go
+++ b/banyand/trace/svc_liaison.go
@@ -23,7 +23,6 @@ import (
        "fmt"
        "path"
        "path/filepath"
-       "sort"
        "time"
 
        "github.com/dustin/go-humanize"
@@ -253,22 +252,7 @@ func (l *liaison) PreRun(ctx context.Context) error {
                                return nil, fmt.Errorf("group %s missing 
resource options", group)
                        }
                        copies := groupSchema.ResourceOpts.Replicas + 1
-                       if len(l.dataNodeList) == 0 {
-                               return nil, fmt.Errorf("no data nodes 
configured for handoff")
-                       }
-                       sortedNodes := append([]string(nil), l.dataNodeList...)
-                       sort.Strings(sortedNodes)
-                       nodes := make([]string, 0, copies)
-                       seen := make(map[string]struct{}, copies)
-                       for replica := uint32(0); replica < copies; replica++ {
-                               nodeID := 
sortedNodes[(int(shardID)+int(replica))%len(sortedNodes)]
-                               if _, ok := seen[nodeID]; ok {
-                                       continue
-                               }
-                               nodes = append(nodes, nodeID)
-                               seen[nodeID] = struct{}{}
-                       }
-                       return nodes, nil
+                       return traceDataNodeRegistry.LocateAll(group, shardID, 
int(copies))
                }
 
                // nolint:contextcheck

Reply via email to