The following pull request was submitted through Github.
It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/7601

This e-mail was sent by the LXC bot, direct replies will not reach the author
unless they happen to be subscribed to this list.

=== Description (from pull-request) ===
This effectively replaces some of LXD's built-in logic around dqlite roles management with equivalent logic now available downstream in the go-dqlite/app package.

There's no change in behavior, except that the switch also brings in a bug fix, since LXD was previously demoting offline voters to spare, even if no replacement was available (i.e. another spare or stand-by node to be promoted to voter). In that situation it's better to not take away the role from the offline voter node (as otherwise we would be left with a cluster of 2 voters), hoping that the offline voter will come back or a spare/stand-by will become available to replace it.
From 761015a70981723a15d7445f49ba1950f3f480c6 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanay...@canonical.com>
Date: Tue, 30 Jun 2020 09:25:47 +0200
Subject: [PATCH 1/4] lxd/cluster: Only look up raft_nodes for resolving the
 address of node 1

Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com>
---
 lxd/cluster/gateway.go    | 33 ++++++++++++++-------------------
 lxd/cluster/membership.go | 10 +++-------
 2 files changed, 17 insertions(+), 26 deletions(-)

diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 42c2ae3eee..4846f14138 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -403,14 +403,11 @@ func (g *Gateway) DialFunc() client.DialFunc {
 // Dial function for establishing raft connections.
 func (g *Gateway) raftDial() client.DialFunc {
        return func(ctx context.Context, address string) (net.Conn, error) {
-               if address == "1" {
-                       addr, err := g.raftAddress(1)
-                       if err != nil {
-                               return nil, err
-                       }
-                       address = string(addr)
+               nodeAddress, err := g.nodeAddress(address)
+               if err != nil {
+                       return nil, err
                }
-               conn, err := dqliteNetworkDial(ctx, address, g)
+               conn, err := dqliteNetworkDial(ctx, nodeAddress, g)
                if err != nil {
                        return nil, err
                }
@@ -480,7 +477,7 @@ func (g *Gateway) TransferLeadership() error {
                if server.ID == g.info.ID || server.Role != db.RaftVoter {
                        continue
                }
-               address, err := g.raftAddress(server.ID)
+               address, err := g.nodeAddress(server.Address)
                if err != nil {
                        return err
                }
@@ -851,27 +848,25 @@ func (g *Gateway) currentRaftNodes() ([]db.RaftNode, 
error) {
                return nil, err
        }
        for i, server := range servers {
-               address, err := g.raftAddress(server.ID)
+               address, err := g.nodeAddress(server.Address)
                if err != nil {
-                       if err != db.ErrNoSuchObject {
-                               return nil, errors.Wrap(err, "Failed to fetch 
raft server address")
-                       }
-                       // Use the initial address as fallback. This is an edge
-                       // case that happens when a new leader is elected and
-                       // its raft_nodes table is not fully up-to-date yet.
-                       address = server.Address
+                       return nil, errors.Wrap(err, "Failed to fetch raft 
server address")
                }
                servers[i].Address = address
        }
        return servers, nil
 }
 
-// Look up a server address in the raft_nodes table.
-func (g *Gateway) raftAddress(databaseID uint64) (string, error) {
+// Translate a raft address to a node address. They are always the same except
+// for the bootstrap node, which has address "1".
+func (g *Gateway) nodeAddress(raftAddress string) (string, error) {
+       if raftAddress != "1" {
+               return raftAddress, nil
+       }
        var address string
        err := g.db.Transaction(func(tx *db.NodeTx) error {
                var err error
-               address, err = tx.GetRaftNodeAddress(int64(databaseID))
+               address, err = tx.GetRaftNodeAddress(1)
                return err
        })
        if err != nil {
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 1d260d4295..cf335c3dd1 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -938,13 +938,9 @@ func List(state *state.State, gateway *Gateway) 
([]api.ClusterMember, error) {
        }
        raftRoles := map[string]client.NodeRole{} // Address to role
        for _, node := range raftNodes {
-               address := node.Address
-               if address == "1" {
-                       addr, err := gateway.raftAddress(1)
-                       if err != nil {
-                               return nil, err
-                       }
-                       address = string(addr)
+               address, err := gateway.nodeAddress(node.Address)
+               if err != nil {
+                       return nil, err
                }
                raftRoles[address] = node.Role
        }

From 292ed405c3e5a2c9bda31ee945896826c232057e Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanay...@canonical.com>
Date: Tue, 30 Jun 2020 10:48:27 +0200
Subject: [PATCH 2/4] lxd/cluster: Leverage RolesChanges.Handover() to choose
 handover target

Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com>
---
 lxd/.dir-locals.el        |  2 +-
 lxd/cluster/membership.go | 78 +++++++++++++++++++++++++++++----------
 2 files changed, 60 insertions(+), 20 deletions(-)

diff --git a/lxd/.dir-locals.el b/lxd/.dir-locals.el
index 77b750661f..b72bdc482e 100644
--- a/lxd/.dir-locals.el
+++ b/lxd/.dir-locals.el
@@ -1,7 +1,7 @@
 ;;; Directory Local Variables
 ;;; For more information see (info "(emacs) Directory Variables")
 ((go-mode
-  . ((go-test-args . "-tags libsqlite3 -timeout 90s")
+  . ((go-test-args . "-tags libsqlite3 -timeout 120s")
      (eval
       . (set
         (make-local-variable 'flycheck-go-build-tags)
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index cf335c3dd1..45ee7df16b 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -7,6 +7,7 @@ import (
        "path/filepath"
        "time"
 
+       "github.com/canonical/go-dqlite/app"
        "github.com/canonical/go-dqlite/client"
        "github.com/lxc/lxd/lxd/db"
        "github.com/lxc/lxd/lxd/db/cluster"
@@ -839,40 +840,79 @@ func Leave(state *state.State, gateway *Gateway, name 
string, force bool) (strin
 func Handover(state *state.State, gateway *Gateway, address string) (string, 
[]db.RaftNode, error) {
        nodes, err := gateway.currentRaftNodes()
        if err != nil {
-               return "", nil, errors.Wrap(err, "Failed to get current raft 
nodes")
+               return "", nil, errors.Wrap(err, "Get current raft nodes")
        }
 
-       // If the member which is shutting down is not a voter, there's nothing
-       // to do.
-       found := false
+       var nodeID uint64
        for _, node := range nodes {
-               if node.Address != address {
-                       continue
-               }
-               if node.Role != db.RaftVoter {
-                       return "", nil, nil
+               if node.Address == address {
+                       nodeID = node.ID
                }
-               found = true
-               break
+
        }
-       if !found {
+       if nodeID == 0 {
                return "", nil, errors.Wrapf(err, "No dqlite node has address 
%s", address)
        }
 
+       roles, err := newRolesChanges(state, gateway, nodes)
+       if err != nil {
+               return "", nil, err
+       }
+       role, candidates := roles.Handover(nodeID)
+
+       if role != db.RaftVoter {
+               return "", nil, nil
+       }
+
        for i, node := range nodes {
-               if node.Role == db.RaftVoter || node.Address == address {
-                       continue
+               if node.Address == candidates[0].Address {
+                       nodes[i].Role = role
+                       return node.Address, nodes, nil
                }
-               if !hasConnectivity(gateway.cert, node.Address) {
-                       continue
-               }
-               nodes[i].Role = db.RaftVoter
-               return node.Address, nodes, nil
        }
 
        return "", nil, nil
 }
 
+// Build an app.RolesChanges object feeded with the current cluster state.
+func newRolesChanges(state *state.State, gateway *Gateway, nodes 
[]db.RaftNode) (*app.RolesChanges, error) {
+       var maxVoters int
+       var maxStandBy int
+       err := state.Cluster.Transaction(func(tx *db.ClusterTx) error {
+               config, err := ConfigLoad(tx)
+               if err != nil {
+                       return errors.Wrap(err, "Load cluster configuration")
+               }
+               maxVoters = int(config.MaxVoters())
+               maxStandBy = int(config.MaxStandBy())
+               return nil
+       })
+       if err != nil {
+               return nil, err
+       }
+
+       cluster := map[client.NodeInfo]*client.NodeMetadata{}
+
+       for _, node := range nodes {
+               if hasConnectivity(gateway.cert, node.Address) {
+                       cluster[node] = &client.NodeMetadata{}
+               } else {
+                       cluster[node] = nil
+               }
+
+       }
+
+       roles := &app.RolesChanges{
+               Config: app.RolesConfig{
+                       Voters:   maxVoters,
+                       StandBys: maxStandBy,
+               },
+               State: cluster,
+       }
+
+       return roles, nil
+}
+
 // Purge removes a node entirely from the cluster database.
 func Purge(cluster *db.Cluster, name string) error {
        logger.Debugf("Remove node %s from the database", name)

From baae60bbd34e0f7b2bd48f14ecd692f159fae412 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanay...@canonical.com>
Date: Tue, 30 Jun 2020 10:54:46 +0200
Subject: [PATCH 3/4] lxd/cluster: Skip unncessary loading of nodes from
 database in Rebalance()

Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com>
---
 lxd/cluster/membership.go | 20 ++++----------------
 1 file changed, 4 insertions(+), 16 deletions(-)

diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index 45ee7df16b..c2b859bde9 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -495,7 +495,6 @@ func Rebalance(state *state.State, gateway *Gateway) 
(string, []db.RaftNode, err
 
        // Fetch the nodes from the database, to get their last heartbeat
        // timestamp and check whether they are offline.
-       nodesByAddress := map[string]db.NodeInfo{}
        var maxVoters int64
        var maxStandBy int64
        err = state.Cluster.Transaction(func(tx *db.ClusterTx) error {
@@ -505,13 +504,6 @@ func Rebalance(state *state.State, gateway *Gateway) 
(string, []db.RaftNode, err
                }
                maxVoters = config.MaxVoters()
                maxStandBy = config.MaxStandBy()
-               nodes, err := tx.GetNodes()
-               if err != nil {
-                       return errors.Wrap(err, "failed to get cluster nodes")
-               }
-               for _, node := range nodes {
-                       nodesByAddress[node.Address] = node
-               }
                return nil
        })
        if err != nil {
@@ -524,8 +516,7 @@ func Rebalance(state *state.State, gateway *Gateway) 
(string, []db.RaftNode, err
        standbys := make([]string, 0)
        candidates := make([]string, 0)
        for i, info := range currentRaftNodes {
-               node := nodesByAddress[info.Address]
-               if !hasConnectivity(gateway.cert, node.Address) {
+               if !hasConnectivity(gateway.cert, info.Address) {
                        if info.Role != db.RaftSpare {
                                client, err := gateway.getClient()
                                if err != nil {
@@ -534,8 +525,7 @@ func Rebalance(state *state.State, gateway *Gateway) 
(string, []db.RaftNode, err
                                defer client.Close()
                                ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
                                defer cancel()
-                               logger.Infof(
-                                       "Demote offline node %s (%s) to spare", 
node.Name, node.Address)
+                               logger.Infof("Demote offline node %s to spare", 
info.Address)
                                err = client.Assign(ctx, info.ID, db.RaftSpare)
                                if err != nil {
                                        return "", nil, errors.Wrap(err, 
"Failed to demote offline node")
@@ -572,10 +562,8 @@ func Rebalance(state *state.State, gateway *Gateway) 
(string, []db.RaftNode, err
        // Check if we have a spare node that we can promote to the missing 
role.
        address := ""
        for _, candidate := range candidates {
-               node := nodesByAddress[candidate]
-               logger.Infof(
-                       "Found spare node %s (%s) to be promoted to %s", 
node.Name, node.Address, role)
-               address = node.Address
+               logger.Infof("Found spare node %s to be promoted to %s", 
candidate, role)
+               address = candidate
                break
        }
 

From d9e7be738732a6ce5e02b585b25eca79df543a54 Mon Sep 17 00:00:00 2001
From: Free Ekanayaka <free.ekanay...@canonical.com>
Date: Tue, 30 Jun 2020 12:51:51 +0200
Subject: [PATCH 4/4] lxd/cluster: Leverage RolesChanges.Adjust() to choose
 rebalance target

Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com>
---
 lxd/api_cluster.go        | 24 +++++++++--
 lxd/cluster/connect.go    |  4 +-
 lxd/cluster/gateway.go    | 20 ++++++++-
 lxd/cluster/membership.go | 91 ++++++---------------------------------
 lxd/cluster/notify.go     |  2 +-
 test/suites/clustering.sh | 13 +++---
 6 files changed, 61 insertions(+), 93 deletions(-)

diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go
index fbc61dd979..04bf1c6bb3 100644
--- a/lxd/api_cluster.go
+++ b/lxd/api_cluster.go
@@ -1199,9 +1199,7 @@ func internalClusterPostRebalance(d *Daemon, r 
*http.Request) response.Response
 // Check if there's a dqlite node whose role should be changed, and post a
 // change role request if so.
 func rebalanceMemberRoles(d *Daemon) error {
-       logger.Debugf("Rebalance cluster")
-
-       // Check if we have a spare node to promote.
+again:
        address, nodes, err := cluster.Rebalance(d.State(), d.gateway)
        if err != nil {
                return err
@@ -1212,13 +1210,31 @@ func rebalanceMemberRoles(d *Daemon) error {
                return nil
        }
 
+       // Process demotions of offline nodes immediatelly.
+       for _, node := range nodes {
+               if node.Address != address || node.Role != db.RaftSpare {
+                       continue
+               }
+
+               if cluster.HasConnectivity(d.endpoints.NetworkCert(), address) {
+                       break
+               }
+
+               err := d.gateway.DemoteOfflineNode(node.ID)
+               if err != nil {
+                       return errors.Wrapf(err, "Demote offline node %s", 
node.Address)
+               }
+
+               goto again
+       }
+
        // Tell the node to promote itself.
        err = changeMemberRole(d, address, nodes)
        if err != nil {
                return err
        }
 
-       return nil
+       goto again
 }
 
 // Post a change role request to the member with the given address. The nodes
diff --git a/lxd/cluster/connect.go b/lxd/cluster/connect.go
index 92eda9bba4..8fdb8a8390 100644
--- a/lxd/cluster/connect.go
+++ b/lxd/cluster/connect.go
@@ -168,8 +168,8 @@ func SetupTrust(cert, targetAddress, targetCert, 
targetPassword string) error {
        return nil
 }
 
-// Probe network connectivity to the member with the given address.
-func hasConnectivity(cert *shared.CertInfo, address string) bool {
+// HasConnectivity probes the member with the given address for connectivity.
+func HasConnectivity(cert *shared.CertInfo, address string) bool {
        config, err := tlsClientConfig(cert)
        if err != nil {
                return false
diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go
index 4846f14138..a4a49f57b3 100644
--- a/lxd/cluster/gateway.go
+++ b/lxd/cluster/gateway.go
@@ -481,7 +481,7 @@ func (g *Gateway) TransferLeadership() error {
                if err != nil {
                        return err
                }
-               if !hasConnectivity(g.cert, address) {
+               if !HasConnectivity(g.cert, address) {
                        continue
                }
                id = server.ID
@@ -498,6 +498,24 @@ func (g *Gateway) TransferLeadership() error {
        return client.Transfer(ctx, id)
 }
 
+// DemoteOfflineNode force demoting an offline node.
+func (g *Gateway) DemoteOfflineNode(raftID uint64) error {
+       cli, err := g.getClient()
+       if err != nil {
+               return errors.Wrap(err, "Connect to local dqlite node")
+       }
+
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+
+       err = cli.Assign(ctx, raftID, db.RaftSpare)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
 // Shutdown this gateway, stopping the gRPC server and possibly the raft 
factory.
 func (g *Gateway) Shutdown() error {
        logger.Infof("Stop database gateway")
diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go
index c2b859bde9..2b019c7ed4 100644
--- a/lxd/cluster/membership.go
+++ b/lxd/cluster/membership.go
@@ -486,100 +486,35 @@ func Rebalance(state *state.State, gateway *Gateway) 
(string, []db.RaftNode, err
                return "", nil, nil
        }
 
-       // First get the current raft members, since this method should be
-       // called after a node has left.
-       currentRaftNodes, err := gateway.currentRaftNodes()
+       nodes, err := gateway.currentRaftNodes()
        if err != nil {
-               return "", nil, errors.Wrap(err, "failed to get current raft 
nodes")
+               return "", nil, errors.Wrap(err, "Get current raft nodes")
        }
 
-       // Fetch the nodes from the database, to get their last heartbeat
-       // timestamp and check whether they are offline.
-       var maxVoters int64
-       var maxStandBy int64
-       err = state.Cluster.Transaction(func(tx *db.ClusterTx) error {
-               config, err := ConfigLoad(tx)
-               if err != nil {
-                       return errors.Wrap(err, "failed load cluster 
configuration")
-               }
-               maxVoters = config.MaxVoters()
-               maxStandBy = config.MaxStandBy()
-               return nil
-       })
+       roles, err := newRolesChanges(state, gateway, nodes)
        if err != nil {
                return "", nil, err
        }
 
-       // Group by role. If a node is offline, we'll try to demote it right
-       // away.
-       voters := make([]string, 0)
-       standbys := make([]string, 0)
-       candidates := make([]string, 0)
-       for i, info := range currentRaftNodes {
-               if !hasConnectivity(gateway.cert, info.Address) {
-                       if info.Role != db.RaftSpare {
-                               client, err := gateway.getClient()
-                               if err != nil {
-                                       return "", nil, errors.Wrap(err, 
"Failed to connect to local dqlite node")
-                               }
-                               defer client.Close()
-                               ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
-                               defer cancel()
-                               logger.Infof("Demote offline node %s to spare", 
info.Address)
-                               err = client.Assign(ctx, info.ID, db.RaftSpare)
-                               if err != nil {
-                                       return "", nil, errors.Wrap(err, 
"Failed to demote offline node")
-                               }
-                               currentRaftNodes[i].Role = db.RaftSpare
-                       }
-                       continue
-               }
-
-               switch info.Role {
-               case db.RaftVoter:
-                       voters = append(voters, info.Address)
-               case db.RaftStandBy:
-                       standbys = append(standbys, info.Address)
-               case db.RaftSpare:
-                       candidates = append(candidates, info.Address)
-               }
-       }
-
-       var role db.RaftRole
+       role, candidates := roles.Adjust(gateway.info.ID)
 
-       if len(voters) < int(maxVoters) && len(currentRaftNodes) >= 3 {
-               role = db.RaftVoter
-               // Include stand-by nodes among the ones that can be promoted,
-               // preferring them over spare ones.
-               candidates = append(standbys, candidates...)
-       } else if len(standbys) < int(maxStandBy) {
-               role = db.RaftStandBy
-       } else {
-               // We're already at full capacity or would have a two-member 
cluster.
-               return "", nil, nil
+       if role == -1 {
+               // No node to promote
+               return "", nodes, nil
        }
 
        // Check if we have a spare node that we can promote to the missing 
role.
-       address := ""
-       for _, candidate := range candidates {
-               logger.Infof("Found spare node %s to be promoted to %s", 
candidate, role)
-               address = candidate
-               break
-       }
+       address := candidates[0].Address
+       logger.Infof("Found node %s whose role needs to be changed to %s", 
address, role)
 
-       if address == "" {
-               // No node to promote
-               return "", currentRaftNodes, nil
-       }
-
-       for i, node := range currentRaftNodes {
+       for i, node := range nodes {
                if node.Address == address {
-                       currentRaftNodes[i].Role = role
+                       nodes[i].Role = role
                        break
                }
        }
 
-       return address, currentRaftNodes, nil
+       return address, nodes, nil
 }
 
 // Assign a new role to the local dqlite node.
@@ -882,7 +817,7 @@ func newRolesChanges(state *state.State, gateway *Gateway, 
nodes []db.RaftNode)
        cluster := map[client.NodeInfo]*client.NodeMetadata{}
 
        for _, node := range nodes {
-               if hasConnectivity(gateway.cert, node.Address) {
+               if HasConnectivity(gateway.cert, node.Address) {
                        cluster[node] = &client.NodeMetadata{}
                } else {
                        cluster[node] = nil
diff --git a/lxd/cluster/notify.go b/lxd/cluster/notify.go
index 603f814093..28f5b44dff 100644
--- a/lxd/cluster/notify.go
+++ b/lxd/cluster/notify.go
@@ -73,7 +73,7 @@ func NewNotifier(state *state.State, cert *shared.CertInfo, 
policy NotifierPolic
                        // enough, let's try to connect to the node, just in
                        // case the heartbeat is lagging behind for some reason
                        // and the node is actually up.
-                       if !hasConnectivity(cert, node.Address) {
+                       if !HasConnectivity(cert, node.Address) {
                                switch policy {
                                case NotifyAll:
                                        return nil, fmt.Errorf("peer node %s is 
down", node.Address)
diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh
index a5e07476ca..49631e8d8f 100644
--- a/test/suites/clustering.sh
+++ b/test/suites/clustering.sh
@@ -1673,7 +1673,7 @@ test_clustering_handover() {
 
   LXD_DIR="${LXD_THREE_DIR}" lxc cluster list
 
-  # Respawn the first node, which is now a stand-by, and the second node, which
+  # Respawn the first node, which is now a spare, and the second node, which
   # is still a voter.
   respawn_lxd_cluster_member "${ns1}" "${LXD_ONE_DIR}"
   respawn_lxd_cluster_member "${ns2}" "${LXD_TWO_DIR}"
@@ -1687,14 +1687,13 @@ test_clustering_handover() {
   wait "$pid1"
   wait "$pid2"
 
-  # Wait some time to allow for a leadership change.
-  sleep 10
+  # Bringing back one of them restore the quorum.
+  respawn_lxd_cluster_member "${ns2}" "${LXD_TWO_DIR}"
 
-  # The first node has been promoted back to voter, and since the fourth node 
is
-  # still up, the cluster is online.
   LXD_DIR="${LXD_ONE_DIR}" lxc cluster list
 
   LXD_DIR="${LXD_ONE_DIR}" lxd shutdown
+  LXD_DIR="${LXD_TWO_DIR}" lxd shutdown
   LXD_DIR="${LXD_FOUR_DIR}" lxd shutdown
   sleep 0.5
   rm -f "${LXD_ONE_DIR}/unix.socket"
@@ -1757,8 +1756,8 @@ test_clustering_rebalance() {
   LXD_DIR="${LXD_ONE_DIR}" lxc config set cluster.offline_threshold 12
   kill -9 "$(cat "${LXD_TWO_DIR}/lxd.pid")"
 
-  # Wait for the second node to be considered offline. We need at most 2 full
-  # hearbeats.
+  # Wait for the second node to be considered offline and be replaced by the
+  # fourth node.
   sleep 25
 
   # The second node is offline and has been demoted.
_______________________________________________
lxc-devel mailing list
lxc-devel@lists.linuxcontainers.org
http://lists.linuxcontainers.org/listinfo/lxc-devel

Reply via email to