The following pull request was submitted through Github. It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/7601
This e-mail was sent by the LXC bot, direct replies will not reach the author unless they happen to be subscribed to this list. === Description (from pull-request) === This effectively replaces some of LXD's built-in logic around dqlite roles management with equivalent logic now available downstream in the go-dqlite/app package. There's no change in behavior, except that the switch also brings in a bug fix, since LXD was previously demoting offline voters to spare, even if no replacement was available (i.e. another spare or stand-by node to be promoted to voter). In that situation it's better to not take away the role from the offline voter node (as otherwise we would be left with a cluster of 2 voters), hoping that the offline voter will come back or a spare/stand-by will become available to replace it.
From 761015a70981723a15d7445f49ba1950f3f480c6 Mon Sep 17 00:00:00 2001 From: Free Ekanayaka <free.ekanay...@canonical.com> Date: Tue, 30 Jun 2020 09:25:47 +0200 Subject: [PATCH 1/4] lxd/cluster: Only look up raft_nodes for resolving the address of node 1 Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com> --- lxd/cluster/gateway.go | 33 ++++++++++++++------------------- lxd/cluster/membership.go | 10 +++------- 2 files changed, 17 insertions(+), 26 deletions(-) diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go index 42c2ae3eee..4846f14138 100644 --- a/lxd/cluster/gateway.go +++ b/lxd/cluster/gateway.go @@ -403,14 +403,11 @@ func (g *Gateway) DialFunc() client.DialFunc { // Dial function for establishing raft connections. func (g *Gateway) raftDial() client.DialFunc { return func(ctx context.Context, address string) (net.Conn, error) { - if address == "1" { - addr, err := g.raftAddress(1) - if err != nil { - return nil, err - } - address = string(addr) + nodeAddress, err := g.nodeAddress(address) + if err != nil { + return nil, err } - conn, err := dqliteNetworkDial(ctx, address, g) + conn, err := dqliteNetworkDial(ctx, nodeAddress, g) if err != nil { return nil, err } @@ -480,7 +477,7 @@ func (g *Gateway) TransferLeadership() error { if server.ID == g.info.ID || server.Role != db.RaftVoter { continue } - address, err := g.raftAddress(server.ID) + address, err := g.nodeAddress(server.Address) if err != nil { return err } @@ -851,27 +848,25 @@ func (g *Gateway) currentRaftNodes() ([]db.RaftNode, error) { return nil, err } for i, server := range servers { - address, err := g.raftAddress(server.ID) + address, err := g.nodeAddress(server.Address) if err != nil { - if err != db.ErrNoSuchObject { - return nil, errors.Wrap(err, "Failed to fetch raft server address") - } - // Use the initial address as fallback. This is an edge - // case that happens when a new leader is elected and - // its raft_nodes table is not fully up-to-date yet. - address = server.Address + return nil, errors.Wrap(err, "Failed to fetch raft server address") } servers[i].Address = address } return servers, nil } -// Look up a server address in the raft_nodes table. -func (g *Gateway) raftAddress(databaseID uint64) (string, error) { +// Translate a raft address to a node address. They are always the same except +// for the bootstrap node, which has address "1". +func (g *Gateway) nodeAddress(raftAddress string) (string, error) { + if raftAddress != "1" { + return raftAddress, nil + } var address string err := g.db.Transaction(func(tx *db.NodeTx) error { var err error - address, err = tx.GetRaftNodeAddress(int64(databaseID)) + address, err = tx.GetRaftNodeAddress(1) return err }) if err != nil { diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go index 1d260d4295..cf335c3dd1 100644 --- a/lxd/cluster/membership.go +++ b/lxd/cluster/membership.go @@ -938,13 +938,9 @@ func List(state *state.State, gateway *Gateway) ([]api.ClusterMember, error) { } raftRoles := map[string]client.NodeRole{} // Address to role for _, node := range raftNodes { - address := node.Address - if address == "1" { - addr, err := gateway.raftAddress(1) - if err != nil { - return nil, err - } - address = string(addr) + address, err := gateway.nodeAddress(node.Address) + if err != nil { + return nil, err } raftRoles[address] = node.Role } From 292ed405c3e5a2c9bda31ee945896826c232057e Mon Sep 17 00:00:00 2001 From: Free Ekanayaka <free.ekanay...@canonical.com> Date: Tue, 30 Jun 2020 10:48:27 +0200 Subject: [PATCH 2/4] lxd/cluster: Leverage RolesChanges.Handover() to choose handover target Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com> --- lxd/.dir-locals.el | 2 +- lxd/cluster/membership.go | 78 +++++++++++++++++++++++++++++---------- 2 files changed, 60 insertions(+), 20 deletions(-) diff --git a/lxd/.dir-locals.el b/lxd/.dir-locals.el index 77b750661f..b72bdc482e 100644 --- a/lxd/.dir-locals.el +++ b/lxd/.dir-locals.el @@ -1,7 +1,7 @@ ;;; Directory Local Variables ;;; For more information see (info "(emacs) Directory Variables") ((go-mode - . ((go-test-args . "-tags libsqlite3 -timeout 90s") + . ((go-test-args . "-tags libsqlite3 -timeout 120s") (eval . (set (make-local-variable 'flycheck-go-build-tags) diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go index cf335c3dd1..45ee7df16b 100644 --- a/lxd/cluster/membership.go +++ b/lxd/cluster/membership.go @@ -7,6 +7,7 @@ import ( "path/filepath" "time" + "github.com/canonical/go-dqlite/app" "github.com/canonical/go-dqlite/client" "github.com/lxc/lxd/lxd/db" "github.com/lxc/lxd/lxd/db/cluster" @@ -839,40 +840,79 @@ func Leave(state *state.State, gateway *Gateway, name string, force bool) (strin func Handover(state *state.State, gateway *Gateway, address string) (string, []db.RaftNode, error) { nodes, err := gateway.currentRaftNodes() if err != nil { - return "", nil, errors.Wrap(err, "Failed to get current raft nodes") + return "", nil, errors.Wrap(err, "Get current raft nodes") } - // If the member which is shutting down is not a voter, there's nothing - // to do. - found := false + var nodeID uint64 for _, node := range nodes { - if node.Address != address { - continue - } - if node.Role != db.RaftVoter { - return "", nil, nil + if node.Address == address { + nodeID = node.ID } - found = true - break + } - if !found { + if nodeID == 0 { return "", nil, errors.Wrapf(err, "No dqlite node has address %s", address) } + roles, err := newRolesChanges(state, gateway, nodes) + if err != nil { + return "", nil, err + } + role, candidates := roles.Handover(nodeID) + + if role != db.RaftVoter { + return "", nil, nil + } + for i, node := range nodes { - if node.Role == db.RaftVoter || node.Address == address { - continue + if node.Address == candidates[0].Address { + nodes[i].Role = role + return node.Address, nodes, nil } - if !hasConnectivity(gateway.cert, node.Address) { - continue - } - nodes[i].Role = db.RaftVoter - return node.Address, nodes, nil } return "", nil, nil } +// Build an app.RolesChanges object feeded with the current cluster state. +func newRolesChanges(state *state.State, gateway *Gateway, nodes []db.RaftNode) (*app.RolesChanges, error) { + var maxVoters int + var maxStandBy int + err := state.Cluster.Transaction(func(tx *db.ClusterTx) error { + config, err := ConfigLoad(tx) + if err != nil { + return errors.Wrap(err, "Load cluster configuration") + } + maxVoters = int(config.MaxVoters()) + maxStandBy = int(config.MaxStandBy()) + return nil + }) + if err != nil { + return nil, err + } + + cluster := map[client.NodeInfo]*client.NodeMetadata{} + + for _, node := range nodes { + if hasConnectivity(gateway.cert, node.Address) { + cluster[node] = &client.NodeMetadata{} + } else { + cluster[node] = nil + } + + } + + roles := &app.RolesChanges{ + Config: app.RolesConfig{ + Voters: maxVoters, + StandBys: maxStandBy, + }, + State: cluster, + } + + return roles, nil +} + // Purge removes a node entirely from the cluster database. func Purge(cluster *db.Cluster, name string) error { logger.Debugf("Remove node %s from the database", name) From baae60bbd34e0f7b2bd48f14ecd692f159fae412 Mon Sep 17 00:00:00 2001 From: Free Ekanayaka <free.ekanay...@canonical.com> Date: Tue, 30 Jun 2020 10:54:46 +0200 Subject: [PATCH 3/4] lxd/cluster: Skip unncessary loading of nodes from database in Rebalance() Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com> --- lxd/cluster/membership.go | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go index 45ee7df16b..c2b859bde9 100644 --- a/lxd/cluster/membership.go +++ b/lxd/cluster/membership.go @@ -495,7 +495,6 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err // Fetch the nodes from the database, to get their last heartbeat // timestamp and check whether they are offline. - nodesByAddress := map[string]db.NodeInfo{} var maxVoters int64 var maxStandBy int64 err = state.Cluster.Transaction(func(tx *db.ClusterTx) error { @@ -505,13 +504,6 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err } maxVoters = config.MaxVoters() maxStandBy = config.MaxStandBy() - nodes, err := tx.GetNodes() - if err != nil { - return errors.Wrap(err, "failed to get cluster nodes") - } - for _, node := range nodes { - nodesByAddress[node.Address] = node - } return nil }) if err != nil { @@ -524,8 +516,7 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err standbys := make([]string, 0) candidates := make([]string, 0) for i, info := range currentRaftNodes { - node := nodesByAddress[info.Address] - if !hasConnectivity(gateway.cert, node.Address) { + if !hasConnectivity(gateway.cert, info.Address) { if info.Role != db.RaftSpare { client, err := gateway.getClient() if err != nil { @@ -534,8 +525,7 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err defer client.Close() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - logger.Infof( - "Demote offline node %s (%s) to spare", node.Name, node.Address) + logger.Infof("Demote offline node %s to spare", info.Address) err = client.Assign(ctx, info.ID, db.RaftSpare) if err != nil { return "", nil, errors.Wrap(err, "Failed to demote offline node") @@ -572,10 +562,8 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err // Check if we have a spare node that we can promote to the missing role. address := "" for _, candidate := range candidates { - node := nodesByAddress[candidate] - logger.Infof( - "Found spare node %s (%s) to be promoted to %s", node.Name, node.Address, role) - address = node.Address + logger.Infof("Found spare node %s to be promoted to %s", candidate, role) + address = candidate break } From d9e7be738732a6ce5e02b585b25eca79df543a54 Mon Sep 17 00:00:00 2001 From: Free Ekanayaka <free.ekanay...@canonical.com> Date: Tue, 30 Jun 2020 12:51:51 +0200 Subject: [PATCH 4/4] lxd/cluster: Leverage RolesChanges.Adjust() to choose rebalance target Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com> --- lxd/api_cluster.go | 24 +++++++++-- lxd/cluster/connect.go | 4 +- lxd/cluster/gateway.go | 20 ++++++++- lxd/cluster/membership.go | 91 ++++++--------------------------------- lxd/cluster/notify.go | 2 +- test/suites/clustering.sh | 13 +++--- 6 files changed, 61 insertions(+), 93 deletions(-) diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go index fbc61dd979..04bf1c6bb3 100644 --- a/lxd/api_cluster.go +++ b/lxd/api_cluster.go @@ -1199,9 +1199,7 @@ func internalClusterPostRebalance(d *Daemon, r *http.Request) response.Response // Check if there's a dqlite node whose role should be changed, and post a // change role request if so. func rebalanceMemberRoles(d *Daemon) error { - logger.Debugf("Rebalance cluster") - - // Check if we have a spare node to promote. +again: address, nodes, err := cluster.Rebalance(d.State(), d.gateway) if err != nil { return err @@ -1212,13 +1210,31 @@ func rebalanceMemberRoles(d *Daemon) error { return nil } + // Process demotions of offline nodes immediatelly. + for _, node := range nodes { + if node.Address != address || node.Role != db.RaftSpare { + continue + } + + if cluster.HasConnectivity(d.endpoints.NetworkCert(), address) { + break + } + + err := d.gateway.DemoteOfflineNode(node.ID) + if err != nil { + return errors.Wrapf(err, "Demote offline node %s", node.Address) + } + + goto again + } + // Tell the node to promote itself. err = changeMemberRole(d, address, nodes) if err != nil { return err } - return nil + goto again } // Post a change role request to the member with the given address. The nodes diff --git a/lxd/cluster/connect.go b/lxd/cluster/connect.go index 92eda9bba4..8fdb8a8390 100644 --- a/lxd/cluster/connect.go +++ b/lxd/cluster/connect.go @@ -168,8 +168,8 @@ func SetupTrust(cert, targetAddress, targetCert, targetPassword string) error { return nil } -// Probe network connectivity to the member with the given address. -func hasConnectivity(cert *shared.CertInfo, address string) bool { +// HasConnectivity probes the member with the given address for connectivity. +func HasConnectivity(cert *shared.CertInfo, address string) bool { config, err := tlsClientConfig(cert) if err != nil { return false diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go index 4846f14138..a4a49f57b3 100644 --- a/lxd/cluster/gateway.go +++ b/lxd/cluster/gateway.go @@ -481,7 +481,7 @@ func (g *Gateway) TransferLeadership() error { if err != nil { return err } - if !hasConnectivity(g.cert, address) { + if !HasConnectivity(g.cert, address) { continue } id = server.ID @@ -498,6 +498,24 @@ func (g *Gateway) TransferLeadership() error { return client.Transfer(ctx, id) } +// DemoteOfflineNode force demoting an offline node. +func (g *Gateway) DemoteOfflineNode(raftID uint64) error { + cli, err := g.getClient() + if err != nil { + return errors.Wrap(err, "Connect to local dqlite node") + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = cli.Assign(ctx, raftID, db.RaftSpare) + if err != nil { + return err + } + + return nil +} + // Shutdown this gateway, stopping the gRPC server and possibly the raft factory. func (g *Gateway) Shutdown() error { logger.Infof("Stop database gateway") diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go index c2b859bde9..2b019c7ed4 100644 --- a/lxd/cluster/membership.go +++ b/lxd/cluster/membership.go @@ -486,100 +486,35 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err return "", nil, nil } - // First get the current raft members, since this method should be - // called after a node has left. - currentRaftNodes, err := gateway.currentRaftNodes() + nodes, err := gateway.currentRaftNodes() if err != nil { - return "", nil, errors.Wrap(err, "failed to get current raft nodes") + return "", nil, errors.Wrap(err, "Get current raft nodes") } - // Fetch the nodes from the database, to get their last heartbeat - // timestamp and check whether they are offline. - var maxVoters int64 - var maxStandBy int64 - err = state.Cluster.Transaction(func(tx *db.ClusterTx) error { - config, err := ConfigLoad(tx) - if err != nil { - return errors.Wrap(err, "failed load cluster configuration") - } - maxVoters = config.MaxVoters() - maxStandBy = config.MaxStandBy() - return nil - }) + roles, err := newRolesChanges(state, gateway, nodes) if err != nil { return "", nil, err } - // Group by role. If a node is offline, we'll try to demote it right - // away. - voters := make([]string, 0) - standbys := make([]string, 0) - candidates := make([]string, 0) - for i, info := range currentRaftNodes { - if !hasConnectivity(gateway.cert, info.Address) { - if info.Role != db.RaftSpare { - client, err := gateway.getClient() - if err != nil { - return "", nil, errors.Wrap(err, "Failed to connect to local dqlite node") - } - defer client.Close() - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - logger.Infof("Demote offline node %s to spare", info.Address) - err = client.Assign(ctx, info.ID, db.RaftSpare) - if err != nil { - return "", nil, errors.Wrap(err, "Failed to demote offline node") - } - currentRaftNodes[i].Role = db.RaftSpare - } - continue - } - - switch info.Role { - case db.RaftVoter: - voters = append(voters, info.Address) - case db.RaftStandBy: - standbys = append(standbys, info.Address) - case db.RaftSpare: - candidates = append(candidates, info.Address) - } - } - - var role db.RaftRole + role, candidates := roles.Adjust(gateway.info.ID) - if len(voters) < int(maxVoters) && len(currentRaftNodes) >= 3 { - role = db.RaftVoter - // Include stand-by nodes among the ones that can be promoted, - // preferring them over spare ones. - candidates = append(standbys, candidates...) - } else if len(standbys) < int(maxStandBy) { - role = db.RaftStandBy - } else { - // We're already at full capacity or would have a two-member cluster. - return "", nil, nil + if role == -1 { + // No node to promote + return "", nodes, nil } // Check if we have a spare node that we can promote to the missing role. - address := "" - for _, candidate := range candidates { - logger.Infof("Found spare node %s to be promoted to %s", candidate, role) - address = candidate - break - } + address := candidates[0].Address + logger.Infof("Found node %s whose role needs to be changed to %s", address, role) - if address == "" { - // No node to promote - return "", currentRaftNodes, nil - } - - for i, node := range currentRaftNodes { + for i, node := range nodes { if node.Address == address { - currentRaftNodes[i].Role = role + nodes[i].Role = role break } } - return address, currentRaftNodes, nil + return address, nodes, nil } // Assign a new role to the local dqlite node. @@ -882,7 +817,7 @@ func newRolesChanges(state *state.State, gateway *Gateway, nodes []db.RaftNode) cluster := map[client.NodeInfo]*client.NodeMetadata{} for _, node := range nodes { - if hasConnectivity(gateway.cert, node.Address) { + if HasConnectivity(gateway.cert, node.Address) { cluster[node] = &client.NodeMetadata{} } else { cluster[node] = nil diff --git a/lxd/cluster/notify.go b/lxd/cluster/notify.go index 603f814093..28f5b44dff 100644 --- a/lxd/cluster/notify.go +++ b/lxd/cluster/notify.go @@ -73,7 +73,7 @@ func NewNotifier(state *state.State, cert *shared.CertInfo, policy NotifierPolic // enough, let's try to connect to the node, just in // case the heartbeat is lagging behind for some reason // and the node is actually up. - if !hasConnectivity(cert, node.Address) { + if !HasConnectivity(cert, node.Address) { switch policy { case NotifyAll: return nil, fmt.Errorf("peer node %s is down", node.Address) diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh index a5e07476ca..49631e8d8f 100644 --- a/test/suites/clustering.sh +++ b/test/suites/clustering.sh @@ -1673,7 +1673,7 @@ test_clustering_handover() { LXD_DIR="${LXD_THREE_DIR}" lxc cluster list - # Respawn the first node, which is now a stand-by, and the second node, which + # Respawn the first node, which is now a spare, and the second node, which # is still a voter. respawn_lxd_cluster_member "${ns1}" "${LXD_ONE_DIR}" respawn_lxd_cluster_member "${ns2}" "${LXD_TWO_DIR}" @@ -1687,14 +1687,13 @@ test_clustering_handover() { wait "$pid1" wait "$pid2" - # Wait some time to allow for a leadership change. - sleep 10 + # Bringing back one of them restore the quorum. + respawn_lxd_cluster_member "${ns2}" "${LXD_TWO_DIR}" - # The first node has been promoted back to voter, and since the fourth node is - # still up, the cluster is online. LXD_DIR="${LXD_ONE_DIR}" lxc cluster list LXD_DIR="${LXD_ONE_DIR}" lxd shutdown + LXD_DIR="${LXD_TWO_DIR}" lxd shutdown LXD_DIR="${LXD_FOUR_DIR}" lxd shutdown sleep 0.5 rm -f "${LXD_ONE_DIR}/unix.socket" @@ -1757,8 +1756,8 @@ test_clustering_rebalance() { LXD_DIR="${LXD_ONE_DIR}" lxc config set cluster.offline_threshold 12 kill -9 "$(cat "${LXD_TWO_DIR}/lxd.pid")" - # Wait for the second node to be considered offline. We need at most 2 full - # hearbeats. + # Wait for the second node to be considered offline and be replaced by the + # fourth node. sleep 25 # The second node is offline and has been demoted.
_______________________________________________ lxc-devel mailing list lxc-devel@lists.linuxcontainers.org http://lists.linuxcontainers.org/listinfo/lxc-devel