The following pull request was submitted through Github. It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/7412
This e-mail was sent by the LXC bot, direct replies will not reach the author unless they happen to be subscribed to this list. === Description (from pull-request) === Although it's possible to hold leadership while being demoted to spare, that creates a few issues down the line. Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com>
From e03bea5bb3a74101f3c5c0427b0d1e0cf5f81d5e Mon Sep 17 00:00:00 2001 From: Free Ekanayaka <free.ekanay...@canonical.com> Date: Thu, 21 May 2020 19:38:42 +0100 Subject: [PATCH] lxd/cluster: Transfer leadership before adjusting roles, not after Although it's possible to hold leadership while being demoted to spare, that creates a few issues down the line. Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com> --- lxd/api_cluster.go | 10 +++++++ lxd/cluster/gateway.go | 58 ++++++++++++++++++++++++++------------- lxd/cluster/membership.go | 52 ++--------------------------------- lxd/daemon.go | 3 ++ 4 files changed, 55 insertions(+), 68 deletions(-) diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go index 94eac84708..31ddbb6abc 100644 --- a/lxd/api_cluster.go +++ b/lxd/api_cluster.go @@ -1286,6 +1286,7 @@ func handoverMemberRole(d *Daemon) error { } // Find the cluster leader. +findLeader: leader, err := d.gateway.LeaderAddress() if err != nil { return err @@ -1297,6 +1298,15 @@ func handoverMemberRole(d *Daemon) error { return nil } + if leader == address { + logger.Info("Transfer leadership") + err := d.gateway.TransferLeadership() + if err != nil { + return errors.Wrapf(err, "Failed to transfer leadership") + } + goto findLeader + } + cert := d.endpoints.NetworkCert() client, err := cluster.Connect(leader, cert, true) if err != nil { diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go index 2195ca6d9f..64a84265c5 100644 --- a/lxd/cluster/gateway.go +++ b/lxd/cluster/gateway.go @@ -428,6 +428,45 @@ func (g *Gateway) Kill() { g.cancel() } +// TransferLeadership attempts to transfer leadership to another node. +func (g *Gateway) TransferLeadership() error { + client, err := g.getClient() + if err != nil { + return err + } + defer client.Close() + + // Try to find a voter that is also online. + servers, err := client.Cluster(context.Background()) + if err != nil { + return err + } + var id uint64 + for _, server := range servers { + if server.ID == g.info.ID || server.Role != db.RaftVoter { + continue + } + address, err := g.raftAddress(server.ID) + if err != nil { + return err + } + if !hasConnectivity(g.cert, address) { + continue + } + id = server.ID + break + } + + if id == 0 { + return fmt.Errorf("No online voter found") + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return client.Transfer(ctx, id) +} + // Shutdown this gateway, stopping the gRPC server and possibly the raft factory. func (g *Gateway) Shutdown() error { logger.Debugf("Stop database gateway") @@ -437,25 +476,6 @@ func (g *Gateway) Shutdown() error { g.Sync() } - // If this is not a standalone node and we are the cluster - // leader, let's try to transfer leadership. - if g.memoryDial == nil { - isLeader, err := g.isLeader() - if err == nil && isLeader { - client, err := g.getClient() - if err == nil { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - logger.Info("Transfer leadership") - err := client.Transfer(ctx, 0) - if err != nil { - logger.Warnf("Failed to transfer leadership: %v", err) - } - client.Close() - } - } - } - g.server.Close() close(g.stopCh) diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go index 9e28022619..62a19f4a91 100644 --- a/lxd/cluster/membership.go +++ b/lxd/cluster/membership.go @@ -533,20 +533,8 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err candidates := make([]string, 0) for i, info := range currentRaftNodes { node := nodesByAddress[info.Address] - if node.IsOffline(offlineThreshold) { + if !hasConnectivity(gateway.cert, node.Address) { if info.Role != db.RaftSpare { - // Even the heartbeat timestamp is not recent - // enough, let's try to connect to the node, - // just in case the heartbeat is lagging behind - // for some reason and the node is actually up. - restclient, err := Connect(node.Address, gateway.cert, true) - if err == nil { - _, _, err = restclient.GetServer() - } - if err == nil { - // This isn't actually offline. - goto append - } client, err := gateway.getClient() if err != nil { return "", nil, errors.Wrap(err, "Failed to connect to local dqlite node") @@ -572,7 +560,7 @@ func Rebalance(state *state.State, gateway *Gateway) (string, []db.RaftNode, err } continue } - append: + switch info.Role { case db.RaftVoter: voters = append(voters, info.Address) @@ -902,11 +890,7 @@ func Handover(state *state.State, gateway *Gateway, address string) (string, []d if node.Role == db.RaftVoter || node.Address == address { continue } - online, err := isMemberOnline(state, gateway.cert, node.Address) - if err != nil { - return "", nil, errors.Wrapf(err, "Failed to check if %s is online", node.Address) - } - if !online { + if !hasConnectivity(gateway.cert, node.Address) { continue } nodes[i].Role = db.RaftVoter @@ -916,36 +900,6 @@ func Handover(state *state.State, gateway *Gateway, address string) (string, []d return "", nil, nil } -// Check if the member with the given address is one. -func isMemberOnline(state *state.State, cert *shared.CertInfo, address string) (bool, error) { - online := true - err := state.Cluster.Transaction(func(tx *db.ClusterTx) error { - offlineThreshold, err := tx.GetNodeOfflineThreshold() - if err != nil { - return err - } - node, err := tx.GetNodeByAddress(address) - if err != nil { - return err - } - if node.IsOffline(offlineThreshold) { - online = false - } - return nil - }) - if err != nil { - return false, err - } - // Even if the heartbeat timestamp is not recent enough, let's try to - // connect to the node, just in case the heartbeat is lagging behind - // for some reason and the node is actually up. - if !online && hasConnectivity(cert, address) { - online = true - } - - return online, nil -} - // Purge removes a node entirely from the cluster database. func Purge(cluster *db.Cluster, name string) error { logger.Debugf("Remove node %s from the database", name) diff --git a/lxd/daemon.go b/lxd/daemon.go index 6a5cd40b93..26bca25d6a 100644 --- a/lxd/daemon.go +++ b/lxd/daemon.go @@ -1534,6 +1534,9 @@ func (d *Daemon) NodeRefreshTask(heartbeatData *cluster.APIHeartbeat) { if isDegraded || voters < int(maxVoters) || standbys < int(maxStandBy) { go func() { + // Wait a little bit, just to avoid spurious + // attempts due to nodes being shut down. + time.Sleep(5 * time.Second) d.clusterMembershipMutex.Lock() defer d.clusterMembershipMutex.Unlock() err := rebalanceMemberRoles(d)
_______________________________________________ lxc-devel mailing list lxc-devel@lists.linuxcontainers.org http://lists.linuxcontainers.org/listinfo/lxc-devel