This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug-hq-sync in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 3d499c5fb22e7baf36c94132908d970a38198b44 Author: Hongtao Gao <[email protected]> AuthorDate: Sun Apr 12 14:26:40 2026 +0000 fix(handoff): use shared LocateAll to prevent enqueuing parts for online nodes ResolveAssignments and syncer's GetNodes were independently computing node lists, causing them to diverge due to different lookup table state. Extract LocateAll on NodeRegistry so both paths use the same selector, eliminating spurious handoff enqueues for healthy data nodes. via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> --- CHANGES.md | 1 + banyand/liaison/grpc/node.go | 24 +++++++++++++ banyand/liaison/grpc/node_test.go | 11 ++++++ banyand/measure/metadata.go | 16 +++------ banyand/stream/metadata.go | 16 +++------ banyand/trace/handoff_controller.go | 10 +++--- banyand/trace/handoff_storage_test.go | 64 +++++++++++++++++++++++++++++++++++ banyand/trace/metadata.go | 16 +++------ banyand/trace/svc_liaison.go | 18 +--------- 9 files changed, 117 insertions(+), 59 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index dd25bc75f..be447f6eb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -32,6 +32,7 @@ Release Notes. - Fix SIDX streaming sync sending SegmentID as MinTimestamp instead of the actual timestamp, causing sync failures on the receiving node. - Fix handoff controller TOCTOU race allowing disk size limit bypass, and populate sidx MinTimestamp/MaxTimestamp during replay to prevent corrupt segment creation on recovered nodes. - Delete orphaned parts when no snapshot references them during tsTable initialization. +- Extract shared LocateAll on NodeRegistry to ensure resolveAssignments and syncer GetNodes always produce identical node lists, preventing liaison from enqueuing parts to online/healthy data nodes. ### Chores diff --git a/banyand/liaison/grpc/node.go b/banyand/liaison/grpc/node.go index 608799735..1c9379b3e 100644 --- a/banyand/liaison/grpc/node.go +++ b/banyand/liaison/grpc/node.go @@ -39,7 +39,10 @@ var ( // NodeRegistry is for locating data node with group/name of the metadata // together with the shardID calculated from the incoming data. type NodeRegistry interface { + // Locate returns the data node assigned to the given (group, name, shardID, replicaID). Locate(group, name string, shardID, replicaID uint32) (string, error) + // LocateAll returns all distinct data nodes owning the given shard across all replicas. + LocateAll(group string, shardID uint32, replicas int) ([]string, error) fmt.Stringer } @@ -74,6 +77,22 @@ func (n *clusterNodeService) Locate(group, name string, shardID, replicaID uint3 return nodeID, nil } +func (n *clusterNodeService) LocateAll(group string, shardID uint32, replicas int) ([]string, error) { + nodeSet := make(map[string]struct{}, replicas) + for replica := 0; replica < replicas; replica++ { + nodeID, err := n.Locate(group, "", shardID, uint32(replica)) + if err != nil { + return nil, errors.Wrapf(err, "fail to locate %s/%d/%d", group, shardID, replica) + } + nodeSet[nodeID] = struct{}{} + } + nodes := make([]string, 0, len(nodeSet)) + for nodeID := range nodeSet { + nodes = append(nodes, nodeID) + } + return nodes, nil +} + func (n *clusterNodeService) OnAddOrUpdate(metadata schema.Metadata) { switch metadata.Kind { case schema.KindNode: @@ -117,3 +136,8 @@ func NewLocalNodeRegistry() NodeRegistry { func (localNodeService) Locate(_, _ string, _, _ uint32) (string, error) { return "local", nil } + +// LocateAll of localNodeService always returns [local]. +func (localNodeService) LocateAll(_ string, _ uint32, _ int) ([]string, error) { + return []string{"local"}, nil +} diff --git a/banyand/liaison/grpc/node_test.go b/banyand/liaison/grpc/node_test.go index 999d78d14..89c11946c 100644 --- a/banyand/liaison/grpc/node_test.go +++ b/banyand/liaison/grpc/node_test.go @@ -58,4 +58,15 @@ func TestClusterNodeRegistry(t *testing.T) { nodeID, err := cnr.Locate("metrics", "instance_traffic", 0, 0) assert.NoError(t, err) assert.Equal(t, fakeNodeID, nodeID) + + nodes, locateAllErr := cnr.LocateAll("metrics", 0, 3) + assert.NoError(t, locateAllErr) + assert.Equal(t, []string{fakeNodeID}, nodes) +} + +func TestLocalNodeRegistry_LocateAll(t *testing.T) { + nr := NewLocalNodeRegistry() + nodes, err := nr.LocateAll("any-group", 0, 3) + assert.NoError(t, err) + assert.Equal(t, []string{"local"}, nodes) } diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 85d1e701f..f14b87416 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -816,18 +816,10 @@ func (s *queueSupplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, SubQueueCreator: newWriteQueue, GetNodes: func(shardID common.ShardID) []string { copies := ro.Replicas + 1 - nodeSet := make(map[string]struct{}, copies) - for i := uint32(0); i < copies; i++ { - nodeID, err := s.measureDataNodeRegistry.Locate(group, "", uint32(shardID), i) - if err != nil { - s.l.Error().Err(err).Str("group", group).Uint32("shard", uint32(shardID)).Uint32("copy", i).Msg("failed to locate node") - return nil - } - nodeSet[nodeID] = struct{}{} - } - nodes := make([]string, 0, len(nodeSet)) - for nodeID := range nodeSet { - nodes = append(nodes, nodeID) + nodes, err := s.measureDataNodeRegistry.LocateAll(group, uint32(shardID), int(copies)) + if err != nil { + s.l.Error().Err(err).Str("group", group).Uint32("shard", uint32(shardID)).Msg("failed to locate nodes") + return nil } return nodes }, diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index e7bbe3c82..269104023 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -667,18 +667,10 @@ func (s *queueSupplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, SubQueueCreator: newWriteQueue, GetNodes: func(shardID common.ShardID) []string { copies := ro.Replicas + 1 - nodeSet := make(map[string]struct{}, copies) - for i := uint32(0); i < copies; i++ { - nodeID, err := s.streamDataNodeRegistry.Locate(group, "", uint32(shardID), i) - if err != nil { - s.l.Error().Err(err).Str("group", group).Uint32("shard", uint32(shardID)).Uint32("copy", i).Msg("failed to locate node") - return nil - } - nodeSet[nodeID] = struct{}{} - } - nodes := make([]string, 0, len(nodeSet)) - for nodeID := range nodeSet { - nodes = append(nodes, nodeID) + nodes, err := s.streamDataNodeRegistry.LocateAll(group, uint32(shardID), int(copies)) + if err != nil { + s.l.Error().Err(err).Str("group", group).Uint32("shard", uint32(shardID)).Msg("failed to locate nodes") + return nil } return nodes }, diff --git a/banyand/trace/handoff_controller.go b/banyand/trace/handoff_controller.go index 4e69f7559..bc6f176f2 100644 --- a/banyand/trace/handoff_controller.go +++ b/banyand/trace/handoff_controller.go @@ -467,14 +467,12 @@ func (hc *handoffController) calculateOfflineNodes(onlineNodes []string, group s continue } seen[node] = struct{}{} - if !hc.isNodeHealthy(node) { - offlineNodes = append(offlineNodes, node) + // Online nodes are delivered by the syncer; only enqueue truly offline nodes. + if _, isOnline := onlineSet[node]; isOnline { continue } - if len(onlineSet) > 0 { - if _, isOnline := onlineSet[node]; !isOnline { - offlineNodes = append(offlineNodes, node) - } + if !hc.isNodeHealthy(node) { + offlineNodes = append(offlineNodes, node) continue } if hc.tire2Client == nil { diff --git a/banyand/trace/handoff_storage_test.go b/banyand/trace/handoff_storage_test.go index 4ca00ca8f..c9f35b1ed 100644 --- a/banyand/trace/handoff_storage_test.go +++ b/banyand/trace/handoff_storage_test.go @@ -627,6 +627,70 @@ func TestHandoffController_FiltersNonOwningOfflineNodes(t *testing.T) { tester.True(os.IsNotExist(statErr), "expected no queue directory to be created for non-owning node") } +func TestHandoffController_OnlineNodeSkipsStaleHealthCheck(t *testing.T) { + tester := require.New(t) + tempDir, deferFunc := test.Space(tester) + defer deferFunc() + + lfs := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + dataNodes := []string{"node1:17912", "node2:17912"} + const groupName = "group1" + const shardID uint32 = 0 + + resolver := func(string, uint32) ([]string, error) { + return dataNodes, nil + } + + // Health check reports only node1 (simulating stale health after node2 restart). + queueClient := &fakeQueueClient{healthy: []string{"node1:17912"}} + hc, err := newHandoffController(lfs, tempDir, queueClient, dataNodes, 0, l, resolver) + tester.NoError(err) + defer hc.close() + + // Both nodes are in the syncer's target list, but health check is stale for node2. + offline := hc.calculateOfflineNodes( + []string{"node1:17912", "node2:17912"}, + groupName, + common.ShardID(shardID), + ) + tester.Len(offline, 0, + "expected no offline nodes when both are in the syncer target list, even with stale health check") +} + +// TestHandoffController_OfflineNodeNotInOnlineSet verifies that a node NOT in the syncer's +// target list and reported unhealthy is correctly classified as offline. +func TestHandoffController_OfflineNodeNotInOnlineSet(t *testing.T) { + tester := require.New(t) + tempDir, deferFunc := test.Space(tester) + defer deferFunc() + + lfs := fs.NewLocalFileSystem() + l := logger.GetLogger("test") + dataNodes := []string{"node1:17912", "node2:17912"} + const groupName = "group1" + const shardID uint32 = 0 + + resolver := func(string, uint32) ([]string, error) { + return dataNodes, nil + } + + // Health check reports only node1 — node2 is offline. + queueClient := &fakeQueueClient{healthy: []string{"node1:17912"}} + hc, err := newHandoffController(lfs, tempDir, queueClient, dataNodes, 0, l, resolver) + tester.NoError(err) + defer hc.close() + + // Only node1 is in the syncer's target list (simulating registry before AddNode). + offline := hc.calculateOfflineNodes( + []string{"node1:17912"}, + groupName, + common.ShardID(shardID), + ) + tester.Len(offline, 1) + tester.Equal("node2:17912", offline[0]) +} + // TestHandoffController_SizeRecovery verifies that total size is correctly calculated on restart. func TestHandoffController_SizeRecovery(t *testing.T) { tester := require.New(t) diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go index 5f4e0fc17..3e0249e4c 100644 --- a/banyand/trace/metadata.go +++ b/banyand/trace/metadata.go @@ -692,18 +692,10 @@ func (qs *queueSupplier) OpenDB(groupSchema *commonv1.Group) (resourceSchema.DB, }, GetNodes: func(shardID common.ShardID) []string { copies := ro.Replicas + 1 - nodeSet := make(map[string]struct{}, copies) - for i := uint32(0); i < copies; i++ { - nodeID, err := qs.traceDataNodeRegistry.Locate(group, "", uint32(shardID), i) - if err != nil { - qs.l.Error().Err(err).Str("group", group).Uint32("shard", uint32(shardID)).Uint32("copy", i).Msg("failed to locate node") - return nil - } - nodeSet[nodeID] = struct{}{} - } - nodes := make([]string, 0, len(nodeSet)) - for nodeID := range nodeSet { - nodes = append(nodes, nodeID) + nodes, err := qs.traceDataNodeRegistry.LocateAll(group, uint32(shardID), int(copies)) + if err != nil { + qs.l.Error().Err(err).Str("group", group).Uint32("shard", uint32(shardID)).Msg("failed to locate nodes") + return nil } return nodes }, diff --git a/banyand/trace/svc_liaison.go b/banyand/trace/svc_liaison.go index 5532ce011..3613b6c48 100644 --- a/banyand/trace/svc_liaison.go +++ b/banyand/trace/svc_liaison.go @@ -23,7 +23,6 @@ import ( "fmt" "path" "path/filepath" - "sort" "time" "github.com/dustin/go-humanize" @@ -253,22 +252,7 @@ func (l *liaison) PreRun(ctx context.Context) error { return nil, fmt.Errorf("group %s missing resource options", group) } copies := groupSchema.ResourceOpts.Replicas + 1 - if len(l.dataNodeList) == 0 { - return nil, fmt.Errorf("no data nodes configured for handoff") - } - sortedNodes := append([]string(nil), l.dataNodeList...) - sort.Strings(sortedNodes) - nodes := make([]string, 0, copies) - seen := make(map[string]struct{}, copies) - for replica := uint32(0); replica < copies; replica++ { - nodeID := sortedNodes[(int(shardID)+int(replica))%len(sortedNodes)] - if _, ok := seen[nodeID]; ok { - continue - } - nodes = append(nodes, nodeID) - seen[nodeID] = struct{}{} - } - return nodes, nil + return traceDataNodeRegistry.LocateAll(group, shardID, int(copies)) } // nolint:contextcheck
