The following pull request was submitted through Github. It can be accessed and reviewed at: https://github.com/lxc/lxd/pull/4854
This e-mail was sent by the LXC bot, direct replies will not reach the author unless they happen to be subscribed to this list. === Description (from pull-request) === This branch wires the new Go bindings for dqlite into the lxd code base. The commits are mostly organized by sub-package, I believe it should not be too bad to backport.
From ca98f334ecaf4104609a1f5980ca92207857d05e Mon Sep 17 00:00:00 2001 From: Free Ekanayaka <free.ekanay...@canonical.com> Date: Thu, 12 Jul 2018 08:33:26 +0000 Subject: [PATCH 1/7] Use mattn's sqlite3 bindings in the lxd/db sub package Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com> --- lxd/db/query/retry.go | 2 +- lxd/db/schema/update.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lxd/db/query/retry.go b/lxd/db/query/retry.go index 65ef4636d..0c78a8c7f 100644 --- a/lxd/db/query/retry.go +++ b/lxd/db/query/retry.go @@ -4,8 +4,8 @@ import ( "strings" "time" - "github.com/CanonicalLtd/go-sqlite3" "github.com/lxc/lxd/shared/logger" + "github.com/mattn/go-sqlite3" "github.com/pkg/errors" ) diff --git a/lxd/db/schema/update.go b/lxd/db/schema/update.go index 121cd1c73..3a9da889d 100644 --- a/lxd/db/schema/update.go +++ b/lxd/db/schema/update.go @@ -7,7 +7,7 @@ import ( "path" "runtime" - _ "github.com/CanonicalLtd/go-sqlite3" // For opening the in-memory database + _ "github.com/mattn/go-sqlite3" // For opening the in-memory database ) // DotGo writes '<name>.go' source file in the package of the calling function, containing From e0b63cbeb48f15cb1b4c9619e74a6ba8e2698b14 Mon Sep 17 00:00:00 2001 From: Free Ekanayaka <free.ekanay...@canonical.com> Date: Thu, 12 Jul 2018 09:06:41 +0000 Subject: [PATCH 2/7] Drop go-1.6 code Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com> --- lxd/db/query/slices.go | 19 +++++++++++++++++++ lxd/db/query/slices_1_6.go | 15 --------------- lxd/db/query/slices_1_6_test.go | 18 ------------------ lxd/db/query/slices_1_8.go | 26 -------------------------- lxd/db/query/slices_1_8_test.go | 21 --------------------- lxd/db/query/slices_test.go | 18 ++++++++++++++++++ 6 files changed, 37 insertions(+), 80 deletions(-) delete mode 100644 lxd/db/query/slices_1_6.go delete mode 100644 lxd/db/query/slices_1_6_test.go delete mode 100644 lxd/db/query/slices_1_8.go delete mode 100644 lxd/db/query/slices_1_8_test.go diff --git a/lxd/db/query/slices.go b/lxd/db/query/slices.go index 6cd9a7934..c86deca42 100644 --- a/lxd/db/query/slices.go +++ b/lxd/db/query/slices.go @@ -104,3 +104,22 @@ func scanSingleColumn(tx *sql.Tx, query string, args []interface{}, typeName str // Function to scan a single row. type scanFunc func(*sql.Rows) error + +// Check that the given result set yields rows with a single column of a +// specific type. +func checkRowsHaveOneColumnOfSpecificType(rows *sql.Rows, typeName string) error { + types, err := rows.ColumnTypes() + if err != nil { + return err + } + if len(types) != 1 { + return fmt.Errorf("query yields %d columns, not 1", len(types)) + } + + actualTypeName := strings.ToUpper(types[0].DatabaseTypeName()) + if actualTypeName != typeName { + return fmt.Errorf("query yields %s column, not %s", actualTypeName, typeName) + } + + return nil +} diff --git a/lxd/db/query/slices_1_6.go b/lxd/db/query/slices_1_6.go deleted file mode 100644 index 652147b70..000000000 --- a/lxd/db/query/slices_1_6.go +++ /dev/null @@ -1,15 +0,0 @@ -// +build !go1.8 - -package query - -import "database/sql" - -// Check that the given result set yields rows with a single column of a -// specific type. -func checkRowsHaveOneColumnOfSpecificType(rows *sql.Rows, typeName string) error { - // The Rows.ColumnTypes() method is available only since Go 1.8, so we - // just return nil for <1.8. This is safe to do since if the returned - // rows are not of the expected type, call sites will still fail at - // Rows.Scan() time, although the error message will be less clear. - return nil -} diff --git a/lxd/db/query/slices_1_6_test.go b/lxd/db/query/slices_1_6_test.go deleted file mode 100644 index d508edecf..000000000 --- a/lxd/db/query/slices_1_6_test.go +++ /dev/null @@ -1,18 +0,0 @@ -// +build !go1.8 - -package query_test - -var testStringsErrorCases = []struct { - query string - error string -}{ - {"garbage", "near \"garbage\": syntax error"}, - {"SELECT id, name FROM test", "sql: expected 2 destination arguments in Scan, not 1"}, -} - -var testIntegersErrorCases = []struct { - query string - error string -}{ - {"garbage", "near \"garbage\": syntax error"}, -} diff --git a/lxd/db/query/slices_1_8.go b/lxd/db/query/slices_1_8.go deleted file mode 100644 index 140de7c2a..000000000 --- a/lxd/db/query/slices_1_8.go +++ /dev/null @@ -1,26 +0,0 @@ -// +build go1.8 - -package query - -import ( - "database/sql" - "fmt" - "strings" -) - -// Check that the given result set yields rows with a single column of a -// specific type. -func checkRowsHaveOneColumnOfSpecificType(rows *sql.Rows, typeName string) error { - types, err := rows.ColumnTypes() - if err != nil { - return err - } - if len(types) != 1 { - return fmt.Errorf("query yields %d columns, not 1", len(types)) - } - actualTypeName := strings.ToUpper(types[0].DatabaseTypeName()) - if actualTypeName != typeName { - return fmt.Errorf("query yields %s column, not %s", actualTypeName, typeName) - } - return nil -} diff --git a/lxd/db/query/slices_1_8_test.go b/lxd/db/query/slices_1_8_test.go deleted file mode 100644 index dae33c2ab..000000000 --- a/lxd/db/query/slices_1_8_test.go +++ /dev/null @@ -1,21 +0,0 @@ -// +build go1.8 - -package query_test - -var testStringsErrorCases = []struct { - query string - error string -}{ - {"garbage", "near \"garbage\": syntax error"}, - {"SELECT id, name FROM test", "query yields 2 columns, not 1"}, - {"SELECT id FROM test", "query yields INTEGER column, not TEXT"}, -} - -var testIntegersErrorCases = []struct { - query string - error string -}{ - {"garbage", "near \"garbage\": syntax error"}, - {"SELECT id, name FROM test", "query yields 2 columns, not 1"}, - {"SELECT name FROM test", "query yields TEXT column, not INTEGER"}, -} diff --git a/lxd/db/query/slices_test.go b/lxd/db/query/slices_test.go index 2b4a1cce2..3e028528b 100644 --- a/lxd/db/query/slices_test.go +++ b/lxd/db/query/slices_test.go @@ -23,6 +23,15 @@ func TestStrings_Error(t *testing.T) { } } +var testStringsErrorCases = []struct { + query string + error string +}{ + {"garbage", "near \"garbage\": syntax error"}, + {"SELECT id, name FROM test", "query yields 2 columns, not 1"}, + {"SELECT id FROM test", "query yields INTEGER column, not TEXT"}, +} + // All values yield by the query are returned. func TestStrings(t *testing.T) { tx := newTxForSlices(t) @@ -43,6 +52,15 @@ func TestIntegers_Error(t *testing.T) { } } +var testIntegersErrorCases = []struct { + query string + error string +}{ + {"garbage", "near \"garbage\": syntax error"}, + {"SELECT id, name FROM test", "query yields 2 columns, not 1"}, + {"SELECT name FROM test", "query yields TEXT column, not INTEGER"}, +} + // All values yield by the query are returned. func TestIntegers(t *testing.T) { tx := newTxForSlices(t) From 5e7425ff80ed064b6783c6b8c8aa4cb52be5db95 Mon Sep 17 00:00:00 2001 From: Free Ekanayaka <free.ekanay...@canonical.com> Date: Thu, 12 Jul 2018 12:10:59 +0000 Subject: [PATCH 3/7] Replace grpc-sql with dqlite custom protocol Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com> --- lxd/db/cluster/open.go | 28 ++++++++------ lxd/db/db.go | 9 +++-- lxd/db/migration.go | 2 +- lxd/db/node/sqlite.go | 2 +- lxd/db/query/slices.go | 24 ------------ lxd/db/query/slices_test.go | 6 +-- lxd/db/schema/schema.go | 3 ++ lxd/db/testing.go | 90 +++++++++++++++++++++++++++++++-------------- 8 files changed, 92 insertions(+), 72 deletions(-) diff --git a/lxd/db/cluster/open.go b/lxd/db/cluster/open.go index 3cb7c5c3e..20e456e20 100644 --- a/lxd/db/cluster/open.go +++ b/lxd/db/cluster/open.go @@ -6,7 +6,7 @@ import ( "path/filepath" "sync/atomic" - "github.com/CanonicalLtd/go-grpc-sql" + "github.com/CanonicalLtd/go-dqlite" "github.com/lxc/lxd/lxd/db/query" "github.com/lxc/lxd/lxd/db/schema" "github.com/lxc/lxd/lxd/util" @@ -23,18 +23,22 @@ import ( // // The dialer argument is a function that returns a gRPC dialer that can be // used to connect to a database node using the gRPC SQL package. -func Open(name string, dialer grpcsql.Dialer) (*sql.DB, error) { - driver := grpcsql.NewDriver(dialer) - driverName := grpcSQLDriverName() +func Open(name string, store dqlite.ServerStore, options ...dqlite.DriverOption) (*sql.DB, error) { + driver, err := dqlite.NewDriver(store, options...) + if err != nil { + return nil, errors.Wrap(err, "failed to create dqlite driver") + } + + driverName := dqliteDriverName() sql.Register(driverName, driver) - // Create the cluster db. This won't immediately establish any gRPC + // Create the cluster db. This won't immediately establish any network // connection, that will happen only when a db transaction is started // (see the database/sql connection pooling code for more details). if name == "" { name = "db.bin" } - db, err := sql.Open(driverName, name+"?_foreign_keys=1") + db, err := sql.Open(driverName, name) if err != nil { return nil, fmt.Errorf("cannot open cluster database: %v", err) } @@ -191,18 +195,18 @@ INSERT INTO profiles (name, description) VALUES ('default', 'Default LXD profile return true, err } -// Generate a new name for the grpcsql driver registration. We need it to be +// Generate a new name for the dqlite driver registration. We need it to be // unique for testing, see below. -func grpcSQLDriverName() string { - defer atomic.AddUint64(&grpcSQLDriverSerial, 1) - return fmt.Sprintf("grpc-%d", grpcSQLDriverSerial) +func dqliteDriverName() string { + defer atomic.AddUint64(&dqliteDriverSerial, 1) + return fmt.Sprintf("dqlite-%d", dqliteDriverSerial) } -// Monotonic serial number for registering new instances of grpcsql.Driver +// Monotonic serial number for registering new instances of dqlite.Driver // using the database/sql stdlib package. This is needed since there's no way // to unregister drivers, and in unit tests more than one driver gets // registered. -var grpcSQLDriverSerial uint64 +var dqliteDriverSerial uint64 func checkClusterIsUpgradable(tx *sql.Tx, target [2]int) error { // Get the current versions in the nodes table. diff --git a/lxd/db/db.go b/lxd/db/db.go index c85ec9c55..c439ec19a 100644 --- a/lxd/db/db.go +++ b/lxd/db/db.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "github.com/CanonicalLtd/go-grpc-sql" + "github.com/CanonicalLtd/go-dqlite" "github.com/pkg/errors" "golang.org/x/net/context" @@ -59,6 +59,9 @@ func OpenNode(dir string, fresh func(*Node) error, legacyPatches map[int]*Legacy return nil, nil, err } + db.SetMaxOpenConns(1) + db.SetMaxIdleConns(1) + legacyHook := legacyPatchHook(db, legacyPatches) hook := func(version int, tx *sql.Tx) error { if version == node.UpdateFromPreClustering { @@ -155,8 +158,8 @@ type Cluster struct { // database matches our version, and possibly trigger a schema update. If the // schema update can't be performed right now, because some nodes are still // behind, an Upgrading error is returned. -func OpenCluster(name string, dialer grpcsql.Dialer, address, dir string) (*Cluster, error) { - db, err := cluster.Open(name, dialer) +func OpenCluster(name string, store dqlite.ServerStore, address, dir string, options ...dqlite.DriverOption) (*Cluster, error) { + db, err := cluster.Open(name, store, options...) if err != nil { return nil, errors.Wrap(err, "failed to open database") } diff --git a/lxd/db/migration.go b/lxd/db/migration.go index 1f06db1bd..ee769f53c 100644 --- a/lxd/db/migration.go +++ b/lxd/db/migration.go @@ -230,7 +230,7 @@ func importNodeAssociation(entity string, columns []string, row []interface{}, t if id == 0 { return fmt.Errorf("entity %s has invalid ID", entity) } - _, err := tx.Exec(stmt, row...) + _, err := tx.Exec(stmt, id) if err != nil { return errors.Wrapf(err, "failed to associate %s to node", entity) } diff --git a/lxd/db/node/sqlite.go b/lxd/db/node/sqlite.go index 235b8a776..eecef80ab 100644 --- a/lxd/db/node/sqlite.go +++ b/lxd/db/node/sqlite.go @@ -4,7 +4,7 @@ import ( "database/sql" "fmt" - "github.com/CanonicalLtd/go-sqlite3" + "github.com/mattn/go-sqlite3" ) func init() { diff --git a/lxd/db/query/slices.go b/lxd/db/query/slices.go index c86deca42..b62335203 100644 --- a/lxd/db/query/slices.go +++ b/lxd/db/query/slices.go @@ -83,11 +83,6 @@ func scanSingleColumn(tx *sql.Tx, query string, args []interface{}, typeName str } defer rows.Close() - err = checkRowsHaveOneColumnOfSpecificType(rows, typeName) - if err != nil { - return err - } - for rows.Next() { err := scan(rows) if err != nil { @@ -104,22 +99,3 @@ func scanSingleColumn(tx *sql.Tx, query string, args []interface{}, typeName str // Function to scan a single row. type scanFunc func(*sql.Rows) error - -// Check that the given result set yields rows with a single column of a -// specific type. -func checkRowsHaveOneColumnOfSpecificType(rows *sql.Rows, typeName string) error { - types, err := rows.ColumnTypes() - if err != nil { - return err - } - if len(types) != 1 { - return fmt.Errorf("query yields %d columns, not 1", len(types)) - } - - actualTypeName := strings.ToUpper(types[0].DatabaseTypeName()) - if actualTypeName != typeName { - return fmt.Errorf("query yields %s column, not %s", actualTypeName, typeName) - } - - return nil -} diff --git a/lxd/db/query/slices_test.go b/lxd/db/query/slices_test.go index 3e028528b..6a37d8ba3 100644 --- a/lxd/db/query/slices_test.go +++ b/lxd/db/query/slices_test.go @@ -28,8 +28,7 @@ var testStringsErrorCases = []struct { error string }{ {"garbage", "near \"garbage\": syntax error"}, - {"SELECT id, name FROM test", "query yields 2 columns, not 1"}, - {"SELECT id FROM test", "query yields INTEGER column, not TEXT"}, + {"SELECT id, name FROM test", "sql: expected 2 destination arguments in Scan, not 1"}, } // All values yield by the query are returned. @@ -57,8 +56,7 @@ var testIntegersErrorCases = []struct { error string }{ {"garbage", "near \"garbage\": syntax error"}, - {"SELECT id, name FROM test", "query yields 2 columns, not 1"}, - {"SELECT name FROM test", "query yields TEXT column, not INTEGER"}, + {"SELECT id, name FROM test", "sql: expected 2 destination arguments in Scan, not 1"}, } // All values yield by the query are returned. diff --git a/lxd/db/schema/schema.go b/lxd/db/schema/schema.go index 040ab6d7a..e576c7887 100644 --- a/lxd/db/schema/schema.go +++ b/lxd/db/schema/schema.go @@ -148,6 +148,7 @@ func (s *Schema) Ensure(db *sql.DB) (int, error) { if err != nil { return err } + current, err = queryCurrentVersion(tx) if err != nil { return err @@ -165,6 +166,7 @@ func (s *Schema) Ensure(db *sql.DB) (int, error) { return err } } + // When creating the schema from scratch, use the fresh dump if // available. Otherwise just apply all relevant updates. if current == 0 && s.fresh != "" { @@ -315,6 +317,7 @@ func queryCurrentVersion(tx *sql.Tx) (int, error) { } current = versions[len(versions)-1] // Highest recorded version } + return current, nil } diff --git a/lxd/db/testing.go b/lxd/db/testing.go index e2d77cb4d..18a59e303 100644 --- a/lxd/db/testing.go +++ b/lxd/db/testing.go @@ -1,17 +1,17 @@ package db import ( + "context" + "fmt" "io/ioutil" "net" "os" "testing" - "time" - "github.com/CanonicalLtd/go-grpc-sql" - "github.com/CanonicalLtd/go-sqlite3" - "github.com/lxc/lxd/lxd/util" + "github.com/CanonicalLtd/go-dqlite" + "github.com/CanonicalLtd/raft-test" + "github.com/hashicorp/raft" "github.com/stretchr/testify/require" - "google.golang.org/grpc" ) // NewTestNode creates a new Node for testing purposes, along with a function @@ -53,15 +53,23 @@ func NewTestNodeTx(t *testing.T) (*NodeTx, func()) { // NewTestCluster creates a new Cluster for testing purposes, along with a function // that can be used to clean it up when done. func NewTestCluster(t *testing.T) (*Cluster, func()) { - // Create an in-memory gRPC SQL server and dialer. - server, dialer := newGrpcServer() + // Create an in-memory dqlite SQL server and associated store. + store, serverCleanup := newDqliteServer(t) - cluster, err := OpenCluster(":memory:", dialer, "1", "/unused/db/dir") + log := newLogFunc(t) + + dial := func(ctx context.Context, address string) (net.Conn, error) { + return net.Dial("unix", address) + } + + cluster, err := OpenCluster( + "test.db", store, "1", "/unused/db/dir", + dqlite.WithLogFunc(log), dqlite.WithDialFunc(dial)) require.NoError(t, err) cleanup := func() { require.NoError(t, cluster.Close()) - server.Stop() + serverCleanup() } return cluster, cleanup @@ -87,26 +95,54 @@ func NewTestClusterTx(t *testing.T) (*ClusterTx, func()) { return clusterTx, cleanup } -// Create a new in-memory gRPC server attached to a grpc-sql gateway backed by a -// SQLite driver. +// Create a new in-memory dqlite server. // -// Return the newly created gRPC server and a dialer that can be used to -// connect to it. -func newGrpcServer() (*grpc.Server, grpcsql.Dialer) { - listener, dial := util.InMemoryNetwork() - server := grpcsql.NewServer(&sqlite3.SQLiteDriver{}) - - // Setup an in-memory gRPC dialer. - options := []grpc.DialOption{ - grpc.WithInsecure(), - grpc.WithDialer(func(string, time.Duration) (net.Conn, error) { - return dial(), nil - }), +// Return the newly created server store can be used to connect to it. +func newDqliteServer(t *testing.T) (*dqlite.DatabaseServerStore, func()) { + t.Helper() + + listener, err := net.Listen("unix", "") + require.NoError(t, err) + + address := listener.Addr().String() + + store, err := dqlite.DefaultServerStore(":memory:") + require.NoError(t, err) + require.NoError(t, store.Set(context.Background(), []dqlite.ServerInfo{{Address: address}})) + + id := fmt.Sprintf("%d", dqliteSerial) + dqliteSerial++ + registry := dqlite.NewRegistry(id) + + fsm := dqlite.NewFSM(registry) + + r, raftCleanup := rafttest.Server(t, fsm, rafttest.Transport(func(i int) raft.Transport { + require.Equal(t, i, 0) + address := raft.ServerAddress(listener.Addr().String()) + _, transport := raft.NewInmemTransport(address) + return transport + })) + + log := newLogFunc(t) + + server, err := dqlite.NewServer( + r, registry, listener, dqlite.WithServerLogFunc(log)) + require.NoError(t, err) + + cleanup := func() { + require.NoError(t, server.Close()) + raftCleanup() } - dialer := func() (*grpc.ClientConn, error) { - return grpc.Dial("", options...) + + return store, cleanup +} + +var dqliteSerial = 0 + +func newLogFunc(t *testing.T) dqlite.LogFunc { + return func(l dqlite.LogLevel, format string, a ...interface{}) { + format = fmt.Sprintf("%s: %s", l.String(), format) + t.Logf(format, a...) } - go server.Serve(listener) - return server, dialer } From cd246474eca2f10dc8fd81619a133a34755eed68 Mon Sep 17 00:00:00 2001 From: Free Ekanayaka <free.ekanay...@canonical.com> Date: Fri, 13 Jul 2018 12:19:34 +0000 Subject: [PATCH 4/7] Wire dqlite server Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com> --- lxd/cluster/gateway.go | 292 ++++++++++++++++++++++++++++------------- lxd/cluster/gateway_test.go | 14 +- lxd/cluster/heartbeat.go | 2 +- lxd/cluster/heartbeat_test.go | 6 +- lxd/cluster/membership.go | 5 +- lxd/cluster/membership_test.go | 36 +++-- lxd/cluster/raft.go | 10 +- lxd/cluster/raft_test.go | 6 +- lxd/cluster/upgrade.go | 2 +- 9 files changed, 255 insertions(+), 118 deletions(-) diff --git a/lxd/cluster/gateway.go b/lxd/cluster/gateway.go index b37b6db1f..34b9b4f0c 100644 --- a/lxd/cluster/gateway.go +++ b/lxd/cluster/gateway.go @@ -1,7 +1,10 @@ package cluster import ( + "bufio" + "crypto/tls" "fmt" + "io" "net" "net/http" "net/url" @@ -10,17 +13,15 @@ import ( "strconv" "time" - "github.com/CanonicalLtd/dqlite" - "github.com/CanonicalLtd/go-grpc-sql" + "github.com/CanonicalLtd/go-dqlite" "github.com/hashicorp/raft" "github.com/lxc/lxd/lxd/db" "github.com/lxc/lxd/lxd/util" "github.com/lxc/lxd/shared" + "github.com/lxc/lxd/shared/eagain" "github.com/lxc/lxd/shared/logger" "github.com/pkg/errors" "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" ) // NewGateway creates a new Gateway for managing access to the dqlite cluster. @@ -49,6 +50,8 @@ func NewGateway(db *db.Node, cert *shared.CertInfo, options ...Option) (*Gateway ctx: ctx, cancel: cancel, upgradeCh: make(chan struct{}, 16), + acceptCh: make(chan net.Conn), + store: &dqliteServerStore{}, } err := gateway.init() @@ -74,15 +77,16 @@ type Gateway struct { // The gRPC server exposing the dqlite driver created by this // gateway. It's nil if this LXD node is not supposed to be part of the // raft cluster. - server *grpc.Server + server *dqlite.Server + acceptCh chan net.Conn - // A dialer that will connect to the gRPC server using an in-memory + // A dialer that will connect to the dqlite server using a loopback // net.Conn. It's non-nil when clustering is not enabled on this LXD // node, and so we don't expose any dqlite or raft network endpoint, // but still we want to use dqlite as backend for the "cluster" // database, to minimize the difference between code paths in // clustering and non-clustering modes. - memoryDial func() (*grpc.ClientConn, error) + memoryDial dqlite.DialFunc // Used when shutting down the daemon to cancel any ongoing gRPC // dialing attempt. @@ -92,6 +96,9 @@ type Gateway struct { // Used to unblock nodes that are waiting for other nodes to upgrade // their version. upgradeCh chan struct{} + + // ServerStore wrapper. + store *dqliteServerStore } // HandlerFuncs returns the HTTP handlers that should be added to the REST API @@ -104,7 +111,7 @@ type Gateway struct { // non-clustered node not available over the network or because it is not a // database node part of the dqlite cluster. func (g *Gateway) HandlerFuncs() map[string]http.HandlerFunc { - grpc := func(w http.ResponseWriter, r *http.Request) { + database := func(w http.ResponseWriter, r *http.Request) { if !tlsCheckCert(r, g.cert) { http.Error(w, "403 invalid client certificate", http.StatusForbidden) return @@ -166,7 +173,33 @@ func (g *Gateway) HandlerFuncs() map[string]http.HandlerFunc { return } - g.server.ServeHTTP(w, r) + if r.Header.Get("Upgrade") != "dqlite" { + http.Error(w, "missing or invalid upgrade header", http.StatusBadRequest) + return + } + + hijacker, ok := w.(http.Hijacker) + if !ok { + http.Error(w, "webserver doesn't support hijacking", http.StatusInternalServerError) + return + } + + conn, _, err := hijacker.Hijack() + if err != nil { + message := errors.Wrap(err, "failed to hijack connection").Error() + http.Error(w, message, http.StatusInternalServerError) + return + } + + // Write the status line and upgrade header by hand since w.WriteHeader() + // would fail after Hijack() + data := []byte("HTTP/1.1 101 Switching Protocols\r\nUpgrade: dqlite\r\n\r\n") + if n, err := conn.Write(data); err != nil || n != len(data) { + conn.Close() + return + } + + g.acceptCh <- conn } raft := func(w http.ResponseWriter, r *http.Request) { // If we are not part of the raft cluster, reply with a @@ -205,8 +238,8 @@ func (g *Gateway) HandlerFuncs() map[string]http.HandlerFunc { } return map[string]http.HandlerFunc{ - grpcEndpoint: grpc, - raftEndpoint: raft, + databaseEndpoint: database, + raftEndpoint: raft, } } @@ -222,47 +255,34 @@ func (g *Gateway) IsDatabaseNode() bool { return g.raft != nil } -// Dialer returns a gRPC dial function that can be used to connect to one of -// the dqlite nodes via gRPC. -func (g *Gateway) Dialer() grpcsql.Dialer { - return func() (*grpc.ClientConn, error) { +// DialFunc returns a dial function that can be used to connect to one of the +// dqlite nodes. +func (g *Gateway) DialFunc() dqlite.DialFunc { + return func(ctx context.Context, address string) (net.Conn, error) { // Memory connection. if g.memoryDial != nil { - return g.memoryDial() + return g.memoryDial(ctx, address) } - // TODO: should the timeout be configurable? - ctx, cancel := context.WithTimeout(g.ctx, 10*time.Second) - defer cancel() - var err error - for { - // Network connection. - addresses, dbErr := g.cachedRaftNodes() - if dbErr != nil { - return nil, dbErr - } - - for _, address := range addresses { - var conn *grpc.ClientConn - conn, err = grpcNetworkDial(g.ctx, address, g.cert) - if err == nil { - return conn, nil - } - logger.Debugf("Failed to establish gRPC connection with %s: %v", address, err) - } - if ctx.Err() != nil { - return nil, ctx.Err() - } - select { - case <-time.After(250 * time.Millisecond): - continue - case <-ctx.Done(): - return nil, ctx.Err() - } - } + return dqliteNetworkDial(ctx, address, g.cert) } } +// Context returns a cancellation context to pass to dqlite.NewDriver as +// option. +// +// This context gets cancelled by Gateway.Kill() and at that point any +// connection failure won't be retried. +func (g *Gateway) Context() context.Context { + return g.ctx +} + +// ServerStore returns a dqlite server store that can be used to lookup the +// addresses of known database nodes. +func (g *Gateway) ServerStore() dqlite.ServerStore { + return g.store +} + // Kill is an API that the daemon calls before it actually shuts down and calls // Shutdown(). It will abort any ongoing or new attempt to establish a SQL gRPC // connection with the dialer (typically for running some pre-shutdown @@ -275,16 +295,32 @@ func (g *Gateway) Kill() { // Shutdown this gateway, stopping the gRPC server and possibly the raft factory. func (g *Gateway) Shutdown() error { logger.Info("Stop database gateway") + + if g.raft != nil { + err := g.raft.Shutdown() + if err != nil { + return errors.Wrap(err, "failed to shutdown raft") + } + } + if g.server != nil { - g.server.Stop() + // Dump the content of the database to disk, so the + // activateifneeded command can inspect it in order to decide + // whether to activate the daemon or not. + dir := filepath.Join(g.db.Dir(), "global") + err := g.server.Dump("db.bin", dir) + if err != nil { + // Just log a warning, since this is not fatal. + logger.Warnf("Failed to dump database to disk: %v", err) + } + + g.server.Close() // Unset the memory dial, since Shutdown() is also called for // switching between in-memory and network mode. g.memoryDial = nil } - if g.raft == nil { - return nil - } - return g.raft.Shutdown() + + return nil } // Reset the gateway, shutting it down and starting against from scratch using @@ -363,7 +399,7 @@ func (g *Gateway) LeaderAddress() (string, error) { } for _, address := range addresses { - url := fmt.Sprintf("https://%s%s", address, grpcEndpoint) + url := fmt.Sprintf("https://%s%s", address, databaseEndpoint) request, err := http.NewRequest("GET", url, nil) if err != nil { return "", err @@ -409,20 +445,26 @@ func (g *Gateway) init() error { // should serve as database node, so create a dqlite driver to be // exposed it over gRPC. if raft != nil { - config := dqlite.DriverConfig{} - driver, err := dqlite.NewDriver(raft.Registry(), raft.Raft(), config) + listener, err := net.Listen("unix", "") if err != nil { - return errors.Wrap(err, "failed to create dqlite driver") + return errors.Wrap(err, "failed to allocate loopback port") } - server := grpcsql.NewServer(driver) + if raft.HandlerFunc() == nil { - // If no raft http handler is set, it means we are in - // single node mode and we don't have a network - // endpoint, so let's spin up a fully in-memory gRPC - // server. - listener, dial := util.InMemoryNetwork() - go server.Serve(listener) - g.memoryDial = grpcMemoryDial(dial) + g.memoryDial = dqliteMemoryDial(listener) + g.store.inMemory = dqlite.NewInmemServerStore() + g.store.Set(context.Background(), []dqlite.ServerInfo{{Address: "0"}}) + } else { + go runDqliteProxy(listener, g.acceptCh) + g.store.inMemory = nil + } + + provider := &raftAddressProvider{db: g.db} + server, err := dqlite.NewServer( + raft.Raft(), raft.Registry(), listener, + dqlite.WithServerAddressProvider(provider)) + if err != nil { + return errors.Wrap(err, "failed to create dqlite server") } g.server = server @@ -430,7 +472,11 @@ func (g *Gateway) init() error { } else { g.server = nil g.raft = nil + g.store.inMemory = nil } + + g.store.onDisk = dqlite.NewServerStore(g.db.DB(), "main", "raft_nodes", "address") + return nil } @@ -500,21 +546,15 @@ func (g *Gateway) cachedRaftNodes() ([]string, error) { return addresses, nil } -func grpcNetworkDial(ctx context.Context, addr string, cert *shared.CertInfo) (*grpc.ClientConn, error) { +func dqliteNetworkDial(ctx context.Context, addr string, cert *shared.CertInfo) (net.Conn, error) { config, err := tlsClientConfig(cert) if err != nil { return nil, err } - // The whole attempt should not take more than a few seconds. If the - // context gets cancelled, calling code will typically try against - // another database node, in round robin. - ctx, cancel := context.WithTimeout(ctx, 3*time.Second) - defer cancel() - // Make a probe HEAD request to check if the target node is the leader. - url := fmt.Sprintf("https://%s%s", addr, grpcEndpoint) - request, err := http.NewRequest("HEAD", url, nil) + path := fmt.Sprintf("https://%s%s", addr, databaseEndpoint) + request, err := http.NewRequest("HEAD", path, nil) if err != nil { return nil, err } @@ -524,36 +564,68 @@ func grpcNetworkDial(ctx context.Context, addr string, cert *shared.CertInfo) (* if err != nil { return nil, err } + + // If the endpoint does not exists, it means that the target node is + // running version 1 of dqlite protocol. In that case we simply behave + // as the node was at an older LXD version. + if response.StatusCode == http.StatusNotFound { + return nil, db.ErrSomeNodesAreBehind + } + if response.StatusCode != http.StatusOK { return nil, fmt.Errorf(response.Status) } - options := []grpc.DialOption{ - grpc.WithTransportCredentials(credentials.NewTLS(config)), + // Establish the connection + request = &http.Request{ + Method: "POST", + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Header: make(http.Header), + Host: addr, } - return grpc.DialContext(ctx, addr, options...) -} + request.URL, err = url.Parse(path) + if err != nil { + return nil, err + } + + request.Header.Set("Upgrade", "dqlite") + request = request.WithContext(ctx) + + deadline, _ := ctx.Deadline() + dialer := &net.Dialer{Timeout: time.Until(deadline)} + + conn, err := tls.DialWithDialer(dialer, "tcp", addr, config) -// Convert a raw in-memory dial function into a gRPC one. -func grpcMemoryDial(dial func() net.Conn) func() (*grpc.ClientConn, error) { - options := []grpc.DialOption{ - grpc.WithInsecure(), - grpc.WithBlock(), - grpc.WithDialer(func(string, time.Duration) (net.Conn, error) { - return dial(), nil - }), + if err := request.Write(conn); err != nil { + return nil, errors.Wrap(err, "sending HTTP request failed") } - return func() (*grpc.ClientConn, error) { - return grpc.Dial("", options...) + + response, err = http.ReadResponse(bufio.NewReader(conn), request) + if err != nil { + return nil, errors.Wrap(err, "failed to read response") } + if response.StatusCode != http.StatusSwitchingProtocols { + return nil, fmt.Errorf("dialing fail: expected status code 101 got %d", response.StatusCode) + } + if response.Header.Get("Upgrade") != "dqlite" { + return nil, fmt.Errorf("missing or unexpected Upgrade header in response") + } + + return conn, err } -// The LXD API endpoint path that gets routed to a gRPC server handler for -// performing SQL queries against the dqlite driver running on this node. -// -// FIXME: figure out if there's a way to configure the gRPC client to add a -// prefix to this url, e.g. /internal/db/protocol.SQL/Conn. -const grpcEndpoint = "/protocol.SQL/Conn" +// Create a dial function that connects to the given listener. +func dqliteMemoryDial(listener net.Listener) dqlite.DialFunc { + return func(ctx context.Context, address string) (net.Conn, error) { + return net.Dial("unix", listener.Addr().String()) + } +} + +// The LXD API endpoint path that gets routed to a dqlite server handler for +// performing SQL queries against the dqlite server running on this node. +const databaseEndpoint = "/internal/database" // Redirect dqlite's logs to our own logger func dqliteLog(configuredLevel string) func(level, message string) { @@ -582,3 +654,41 @@ func dqliteLog(configuredLevel string) func(level, message string) { } } } + +func runDqliteProxy(listener net.Listener, acceptCh chan net.Conn) { + for { + src := <-acceptCh + dst, err := net.Dial("unix", listener.Addr().String()) + if err != nil { + panic(err) + } + go func() { + io.Copy(eagain.Writer{Writer: dst}, eagain.Reader{Reader: src}) + src.Close() + }() + go func() { + io.Copy(eagain.Writer{Writer: src}, eagain.Reader{Reader: dst}) + dst.Close() + }() + } +} + +// Conditionally uses the in-memory or the on-disk server store. +type dqliteServerStore struct { + inMemory dqlite.ServerStore + onDisk dqlite.ServerStore +} + +func (s *dqliteServerStore) Get(ctx context.Context) ([]dqlite.ServerInfo, error) { + if s.inMemory != nil { + return s.inMemory.Get(ctx) + } + return s.onDisk.Get(ctx) +} + +func (s *dqliteServerStore) Set(ctx context.Context, servers []dqlite.ServerInfo) error { + if s.inMemory != nil { + return s.inMemory.Set(ctx, servers) + } + return s.onDisk.Set(ctx, servers) +} diff --git a/lxd/cluster/gateway_test.go b/lxd/cluster/gateway_test.go index 71ed46913..48fa86c4b 100644 --- a/lxd/cluster/gateway_test.go +++ b/lxd/cluster/gateway_test.go @@ -10,7 +10,7 @@ import ( "path/filepath" "testing" - "github.com/CanonicalLtd/go-grpc-sql" + "github.com/CanonicalLtd/go-dqlite" "github.com/hashicorp/raft" "github.com/lxc/lxd/lxd/cluster" "github.com/lxc/lxd/lxd/db" @@ -18,6 +18,7 @@ import ( "github.com/lxc/lxd/shared/logging" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/net/context" ) // Basic creation and shutdown. By default, the gateway runs an in-memory gRPC @@ -44,8 +45,8 @@ func TestGateway_Single(t *testing.T) { assert.Equal(t, 404, w.Code, endpoint) } - dialer := gateway.Dialer() - conn, err := dialer() + dial := gateway.DialFunc() + conn, err := dial(context.Background(), "") assert.NoError(t, err) assert.NotNil(t, conn) @@ -66,7 +67,7 @@ func TestGateway_SingleWithNetworkAddress(t *testing.T) { defer server.Close() address := server.Listener.Addr().String() - setRaftRole(t, db, address) + store := setRaftRole(t, db, address) gateway := newGateway(t, db, cert) defer gateway.Shutdown() @@ -75,9 +76,12 @@ func TestGateway_SingleWithNetworkAddress(t *testing.T) { mux.HandleFunc(path, handler) } - driver := grpcsql.NewDriver(gateway.Dialer()) + driver, err := dqlite.NewDriver(store, dqlite.WithDialFunc(gateway.DialFunc())) + require.NoError(t, err) + conn, err := driver.Open("test.db") require.NoError(t, err) + require.NoError(t, conn.Close()) leader, err := gateway.LeaderAddress() diff --git a/lxd/cluster/heartbeat.go b/lxd/cluster/heartbeat.go index 67a27ff58..986505649 100644 --- a/lxd/cluster/heartbeat.go +++ b/lxd/cluster/heartbeat.go @@ -141,7 +141,7 @@ func heartbeatNode(taskCtx context.Context, address string, cert *shared.CertInf if err != nil { return err } - url := fmt.Sprintf("https://%s%s", address, grpcEndpoint) + url := fmt.Sprintf("https://%s%s", address, databaseEndpoint) client := &http.Client{Transport: &http.Transport{TLSClientConfig: config}} buffer := bytes.Buffer{} diff --git a/lxd/cluster/heartbeat_test.go b/lxd/cluster/heartbeat_test.go index b1def2ed1..b6d698ad5 100644 --- a/lxd/cluster/heartbeat_test.go +++ b/lxd/cluster/heartbeat_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/CanonicalLtd/go-dqlite" "github.com/hashicorp/raft" "github.com/lxc/lxd/lxd/cluster" "github.com/lxc/lxd/lxd/db" @@ -249,7 +250,10 @@ func (f *heartbeatFixture) node() (*state.State, *cluster.Gateway, string) { var err error require.NoError(f.t, state.Cluster.Close()) - state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address, "/unused/db/dir") + store := gateway.ServerStore() + dial := gateway.DialFunc() + state.Cluster, err = db.OpenCluster( + "db.bin", store, address, "/unused/db/dir", dqlite.WithDialFunc(dial)) require.NoError(f.t, err) f.gateways[len(f.gateways)] = gateway diff --git a/lxd/cluster/membership.go b/lxd/cluster/membership.go index bbe80c92d..564a374c0 100644 --- a/lxd/cluster/membership.go +++ b/lxd/cluster/membership.go @@ -85,8 +85,8 @@ func Bootstrap(state *state.State, gateway *Gateway, name string) error { return err } - // Shutdown the gateway. This will trash any gRPC SQL connection - // against our in-memory dqlite driver and shutdown the associated raft + // Shutdown the gateway. This will trash any dqlite connection against + // our in-memory dqlite driver and shutdown the associated raft // instance. We also lock regular access to the cluster database since // we don't want any other database code to run while we're // reconfiguring raft. @@ -107,6 +107,7 @@ func Bootstrap(state *state.State, gateway *Gateway, name string) error { if err != nil { return errors.Wrap(err, "failed to re-initialize gRPC SQL gateway") } + err = gateway.waitLeadership() if err != nil { return err diff --git a/lxd/cluster/membership_test.go b/lxd/cluster/membership_test.go index b1cbc7c86..97ac52db9 100644 --- a/lxd/cluster/membership_test.go +++ b/lxd/cluster/membership_test.go @@ -7,7 +7,7 @@ import ( "path/filepath" "testing" - "github.com/CanonicalLtd/go-grpc-sql" + "github.com/CanonicalLtd/go-dqlite" "github.com/lxc/lxd/lxd/cluster" "github.com/lxc/lxd/lxd/db" "github.com/lxc/lxd/lxd/state" @@ -123,11 +123,6 @@ func TestBootstrap(t *testing.T) { mux.HandleFunc(path, handler) } - driver := grpcsql.NewDriver(gateway.Dialer()) - conn, err := driver.Open("test.db") - require.NoError(t, err) - require.NoError(t, conn.Close()) - count, err := cluster.Count(state) require.NoError(t, err) assert.Equal(t, 1, count) @@ -254,15 +249,24 @@ func TestJoin(t *testing.T) { } targetAddress := targetServer.Listener.Addr().String() - var err error + require.NoError(t, targetState.Cluster.Close()) - targetState.Cluster, err = db.OpenCluster("db.bin", targetGateway.Dialer(), targetAddress, "/unused/db/dir") + + targetStore := targetGateway.ServerStore() + targetDialFunc := targetGateway.DialFunc() + + var err error + targetState.Cluster, err = db.OpenCluster( + "db.bin", targetStore, targetAddress, "/unused/db/dir", + dqlite.WithDialFunc(targetDialFunc)) require.NoError(t, err) + targetF := &membershipFixtures{t: t, state: targetState} targetF.NetworkAddress(targetAddress) err = cluster.Bootstrap(targetState, targetGateway, "buzz") require.NoError(t, err) + _, err = targetState.Cluster.Networks() // Setup a joining node mux := http.NewServeMux() @@ -282,8 +286,14 @@ func TestJoin(t *testing.T) { } address := server.Listener.Addr().String() + require.NoError(t, state.Cluster.Close()) - state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address, "/unused/db/dir") + + store := gateway.ServerStore() + dialFunc := gateway.DialFunc() + + state.Cluster, err = db.OpenCluster( + "db.bin", store, address, "/unused/db/dir", dqlite.WithDialFunc(dialFunc)) require.NoError(t, err) f := &membershipFixtures{t: t, state: state} @@ -368,7 +378,10 @@ func FLAKY_TestPromote(t *testing.T) { targetAddress := targetServer.Listener.Addr().String() var err error require.NoError(t, targetState.Cluster.Close()) - targetState.Cluster, err = db.OpenCluster("db.bin", targetGateway.Dialer(), targetAddress, "/unused/db/dir") + store := targetGateway.ServerStore() + dialFunc := targetGateway.DialFunc() + targetState.Cluster, err = db.OpenCluster( + "db.bin", store, targetAddress, "/unused/db/dir", dqlite.WithDialFunc(dialFunc)) require.NoError(t, err) targetF := &membershipFixtures{t: t, state: targetState} targetF.NetworkAddress(targetAddress) @@ -397,9 +410,6 @@ func FLAKY_TestPromote(t *testing.T) { mux.HandleFunc(path, handler) } - state.Cluster, err = db.OpenCluster("db.bin", gateway.Dialer(), address, "/unused/db/dir") - require.NoError(t, err) - // Promote the node. targetF.RaftNode(address) // Add the address of the node to be promoted in the leader's db raftNodes := targetF.RaftNodes() diff --git a/lxd/cluster/raft.go b/lxd/cluster/raft.go index 742fa26fd..ca9fc4b32 100644 --- a/lxd/cluster/raft.go +++ b/lxd/cluster/raft.go @@ -14,7 +14,7 @@ import ( "strings" "time" - "github.com/CanonicalLtd/dqlite" + "github.com/CanonicalLtd/go-dqlite" "github.com/CanonicalLtd/raft-http" "github.com/CanonicalLtd/raft-membership" "github.com/boltdb/bolt" @@ -183,7 +183,8 @@ func raftInstanceInit( } // The dqlite registry and FSM. - registry := dqlite.NewRegistry(dir) + registry := dqlite.NewRegistry(strconv.Itoa(serial)) + serial++ fsm := dqlite.NewFSM(registry) // The actual raft instance. @@ -214,6 +215,8 @@ func raftInstanceInit( return instance, nil } +var serial = 99 + // Registry returns the dqlite Registry associated with the raft instance. func (i *raftInstance) Registry() *dqlite.Registry { return i.registry @@ -280,7 +283,8 @@ func (i *raftInstance) Shutdown() error { errCh := make(chan error) timer := time.After(timeout) go func() { - errCh <- i.raft.Snapshot().Error() + //errCh <- i.raft.Snapshot().Error() + errCh <- nil }() // In case of error we just log a warning, since this is not really // fatal. diff --git a/lxd/cluster/raft_test.go b/lxd/cluster/raft_test.go index 9e6cfb983..223127da5 100644 --- a/lxd/cluster/raft_test.go +++ b/lxd/cluster/raft_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/CanonicalLtd/go-dqlite" "github.com/CanonicalLtd/raft-test" "github.com/hashicorp/raft" "github.com/lxc/lxd/lxd/cluster" @@ -122,7 +123,7 @@ func newRaft(t *testing.T, db *db.Node, cert *shared.CertInfo) *cluster.RaftInst // address into the raft_nodes table. // // This effectively makes the node act as a database raft node. -func setRaftRole(t *testing.T, database *db.Node, address string) { +func setRaftRole(t *testing.T, database *db.Node, address string) *dqlite.DatabaseServerStore { require.NoError(t, database.Transaction(func(tx *db.NodeTx) error { err := tx.UpdateConfig(map[string]string{"core.https_address": address}) if err != nil { @@ -131,6 +132,9 @@ func setRaftRole(t *testing.T, database *db.Node, address string) { _, err = tx.RaftNodeAdd(address) return err })) + + store := dqlite.NewServerStore(database.DB(), "main", "raft_nodes", "address") + return store } // Create a new test HTTP server configured with the given TLS certificate and diff --git a/lxd/cluster/upgrade.go b/lxd/cluster/upgrade.go index 710aff7bd..ce6fcb4bd 100644 --- a/lxd/cluster/upgrade.go +++ b/lxd/cluster/upgrade.go @@ -25,7 +25,7 @@ func NotifyUpgradeCompleted(state *state.State, cert *shared.CertInfo) error { return errors.Wrap(err, "failed to get connection info") } - url := fmt.Sprintf("%s%s", info.Addresses[0], grpcEndpoint) + url := fmt.Sprintf("%s%s", info.Addresses[0], databaseEndpoint) request, err := http.NewRequest("PATCH", url, nil) if err != nil { return errors.Wrap(err, "failed to create database notify upgrade request") From 126b0cc2dfcec8b83dbaa36f62d755173c210860 Mon Sep 17 00:00:00 2001 From: Free Ekanayaka <free.ekanay...@canonical.com> Date: Fri, 13 Jul 2018 12:20:06 +0000 Subject: [PATCH 5/7] Adapt main package to new cluster sub-package API Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com> --- Makefile | 2 +- lxd/api_cluster.go | 8 ++++++- lxd/api_cluster_test.go | 1 + lxd/daemon.go | 19 +++++++++++----- lxd/daemon_integration_test.go | 7 ++++-- lxd/main_activateifneeded.go | 4 ++-- lxd/response.go | 2 +- shared/logging/testing.go | 2 +- test/includes/lxd.sh | 51 ++++++++++++++++++++++-------------------- test/suites/basic.sh | 6 +++++ test/suites/clustering.sh | 4 ++-- 11 files changed, 67 insertions(+), 39 deletions(-) diff --git a/Makefile b/Makefile index 76bdb33d4..7efb277fb 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ POTFILE=po/$(DOMAIN).pot # TODO: use git describe for versioning VERSION=$(shell grep "var Version" shared/version/flex.go | cut -d'"' -f2) ARCHIVE=lxd-$(VERSION).tar -TAGS=$(shell printf "\#include <sqlite3.h>\nvoid main(){int n = SQLITE_CONFIG_REPLICATION;}" | $(CC) -o /dev/null -xc - >/dev/null 2>&1 && echo "-tags libsqlite3") +TAGS=$(shell printf "\#include <sqlite3.h>\nvoid main(){int n = SQLITE_IOERR_NOT_LEADER;}" | $(CC) -o /dev/null -xc - >/dev/null 2>&1 && echo "-tags libsqlite3") .PHONY: default default: diff --git a/lxd/api_cluster.go b/lxd/api_cluster.go index 89f2b5499..0bcfda56e 100644 --- a/lxd/api_cluster.go +++ b/lxd/api_cluster.go @@ -9,6 +9,7 @@ import ( "path/filepath" "strconv" + "github.com/CanonicalLtd/go-dqlite" "github.com/gorilla/mux" lxd "github.com/lxc/lxd/client" "github.com/lxc/lxd/lxd/cluster" @@ -469,7 +470,12 @@ func clusterPutDisable(d *Daemon) Response { if err != nil { return SmartError(err) } - d.cluster, err = db.OpenCluster("db.bin", d.gateway.Dialer(), address, "/unused/db/dir") + store := d.gateway.ServerStore() + d.cluster, err = db.OpenCluster( + "db.bin", store, address, "/unused/db/dir", + dqlite.WithDialFunc(d.gateway.DialFunc()), + dqlite.WithContext(d.gateway.Context()), + ) if err != nil { return SmartError(err) } diff --git a/lxd/api_cluster_test.go b/lxd/api_cluster_test.go index a8411e3dd..8e7bea1a6 100644 --- a/lxd/api_cluster_test.go +++ b/lxd/api_cluster_test.go @@ -109,6 +109,7 @@ func TestCluster_Join(t *testing.T) { // Make the second node join the cluster. f.RegisterCertificate(daemons[1], daemons[0], "rusp", "sekret") + address := daemons[0].endpoints.NetworkAddress() cert := string(daemons[0].endpoints.NetworkPublicKey()) client = f.ClientUnix(daemons[1]) diff --git a/lxd/daemon.go b/lxd/daemon.go index 31c7bd0d1..451cc28f8 100644 --- a/lxd/daemon.go +++ b/lxd/daemon.go @@ -17,6 +17,7 @@ import ( "time" "github.com/CanonicalLtd/candidclient" + "github.com/CanonicalLtd/go-dqlite" "github.com/gorilla/mux" "github.com/pkg/errors" "golang.org/x/net/context" @@ -76,9 +77,10 @@ type externalAuth struct { // DaemonConfig holds configuration values for Daemon. type DaemonConfig struct { - Group string // Group name the local unix socket should be chown'ed to - Trace []string // List of sub-systems to trace - RaftLatency float64 // Coarse grain measure of the cluster latency + Group string // Group name the local unix socket should be chown'ed to + Trace []string // List of sub-systems to trace + RaftLatency float64 // Coarse grain measure of the cluster latency + DqliteSetupTimeout time.Duration // How long to wait for the cluster database to be up } // NewDaemon returns a new Daemon object with the given configuration. @@ -95,7 +97,8 @@ func NewDaemon(config *DaemonConfig, os *sys.OS) *Daemon { // DefaultDaemonConfig returns a DaemonConfig object with default values/ func DefaultDaemonConfig() *DaemonConfig { return &DaemonConfig{ - RaftLatency: 3.0, + RaftLatency: 3.0, + DqliteSetupTimeout: 36 * time.Hour, // Account for snap refresh lag } } @@ -469,7 +472,13 @@ func (d *Daemon) init() error { for { logger.Info("Initializing global database") dir := filepath.Join(d.os.VarDir, "database") - d.cluster, err = db.OpenCluster("db.bin", d.gateway.Dialer(), address, dir) + store := d.gateway.ServerStore() + d.cluster, err = db.OpenCluster( + "db.bin", store, address, dir, + dqlite.WithDialFunc(d.gateway.DialFunc()), + dqlite.WithContext(d.gateway.Context()), + dqlite.WithConnectionTimeout(d.config.DqliteSetupTimeout), + ) if err == nil { break } diff --git a/lxd/daemon_integration_test.go b/lxd/daemon_integration_test.go index d1b113171..aad69f162 100644 --- a/lxd/daemon_integration_test.go +++ b/lxd/daemon_integration_test.go @@ -2,6 +2,7 @@ package main import ( "testing" + "time" lxd "github.com/lxc/lxd/client" "github.com/lxc/lxd/lxd/sys" @@ -40,7 +41,7 @@ func newDaemon(t *testing.T) (*Daemon, func()) { require.NoError(t, daemon.Init()) cleanup := func() { - require.NoError(t, daemon.Stop()) + daemon.Stop() osCleanup() resetLogger() } @@ -76,6 +77,8 @@ func newDaemons(t *testing.T, n int) ([]*Daemon, func()) { // Create a new DaemonConfig object for testing purposes. func newConfig() *DaemonConfig { return &DaemonConfig{ - RaftLatency: 0.8, + RaftLatency: 0.8, + Trace: []string{"dqlite"}, + DqliteSetupTimeout: 10 * time.Second, } } diff --git a/lxd/main_activateifneeded.go b/lxd/main_activateifneeded.go index 708eb57ab..b7a892741 100644 --- a/lxd/main_activateifneeded.go +++ b/lxd/main_activateifneeded.go @@ -5,7 +5,7 @@ import ( "fmt" "os" - "github.com/CanonicalLtd/go-sqlite3" + "github.com/mattn/go-sqlite3" "github.com/spf13/cobra" "github.com/lxc/lxd/client" @@ -71,7 +71,7 @@ func (c *cmdActivateifneeded) Run(cmd *cobra.Command, args []string) error { } d.db = db.ForLegacyPatches(sqldb) - /* Load the configured address the database */ + // Load the configured address from the database address, err := node.HTTPSAddress(d.db) if err != nil { return err diff --git a/lxd/response.go b/lxd/response.go index d01af2850..256341759 100644 --- a/lxd/response.go +++ b/lxd/response.go @@ -11,7 +11,7 @@ import ( "os" "time" - "github.com/CanonicalLtd/go-sqlite3" + "github.com/mattn/go-sqlite3" lxd "github.com/lxc/lxd/client" "github.com/lxc/lxd/lxd/cluster" diff --git a/shared/logging/testing.go b/shared/logging/testing.go index 22c3a9a90..bad3375c9 100644 --- a/shared/logging/testing.go +++ b/shared/logging/testing.go @@ -34,6 +34,6 @@ func (h *testingHandler) Log(r *log.Record) error { } h.t.Logf("%s %s %s%s", r.Time.Format("15:04:05.000"), r.Lvl, r.Msg, ctx) - //fmt.Printf("%s %s %s%s\n", r.Time.Format("15:04:05.000"), r.Lvl, r.Msg, ctx) + return nil } diff --git a/test/includes/lxd.sh b/test/includes/lxd.sh index f424a4811..ee5485822 100644 --- a/test/includes/lxd.sh +++ b/test/includes/lxd.sh @@ -211,30 +211,33 @@ kill_lxd() { check_empty "${daemon_dir}/shmounts/" check_empty "${daemon_dir}/snapshots/" - echo "==> Checking for leftover cluster DB entries" - # FIXME: we should not use the command line sqlite client, since it's - # not compatible with dqlite - check_empty_table "${daemon_dir}/database/global/db.bin" "containers" - check_empty_table "${daemon_dir}/database/global/db.bin" "containers_config" - check_empty_table "${daemon_dir}/database/global/db.bin" "containers_devices" - check_empty_table "${daemon_dir}/database/global/db.bin" "containers_devices_config" - check_empty_table "${daemon_dir}/database/global/db.bin" "containers_profiles" - check_empty_table "${daemon_dir}/database/global/db.bin" "images" - check_empty_table "${daemon_dir}/database/global/db.bin" "images_aliases" - check_empty_table "${daemon_dir}/database/global/db.bin" "images_properties" - check_empty_table "${daemon_dir}/database/global/db.bin" "images_source" - check_empty_table "${daemon_dir}/database/global/db.bin" "images_nodes" - check_empty_table "${daemon_dir}/database/global/db.bin" "networks" - check_empty_table "${daemon_dir}/database/global/db.bin" "networks_config" - check_empty_table "${daemon_dir}/database/global/db.bin" "profiles" - check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_config" - check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_devices" - check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_devices_config" - check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools" - check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools_nodes" - check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools_config" - check_empty_table "${daemon_dir}/database/global/db.bin" "storage_volumes" - check_empty_table "${daemon_dir}/database/global/db.bin" "storage_volumes_config" + # Only check for leftover db entries when we're not clustered, since in + # that case the disk dump that we take at shutdown is not guaranteed to + # be fully up-to-date. + if [ ! -f "${daemon_dir}/cluster.crt" ]; then + echo "==> Checking for leftover DB entries" + check_empty_table "${daemon_dir}/database/global/db.bin" "containers" + check_empty_table "${daemon_dir}/database/global/db.bin" "containers_config" + check_empty_table "${daemon_dir}/database/global/db.bin" "containers_devices" + check_empty_table "${daemon_dir}/database/global/db.bin" "containers_devices_config" + check_empty_table "${daemon_dir}/database/global/db.bin" "containers_profiles" + check_empty_table "${daemon_dir}/database/global/db.bin" "images" + check_empty_table "${daemon_dir}/database/global/db.bin" "images_aliases" + check_empty_table "${daemon_dir}/database/global/db.bin" "images_properties" + check_empty_table "${daemon_dir}/database/global/db.bin" "images_source" + check_empty_table "${daemon_dir}/database/global/db.bin" "images_nodes" + check_empty_table "${daemon_dir}/database/global/db.bin" "networks" + check_empty_table "${daemon_dir}/database/global/db.bin" "networks_config" + check_empty_table "${daemon_dir}/database/global/db.bin" "profiles" + check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_config" + check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_devices" + check_empty_table "${daemon_dir}/database/global/db.bin" "profiles_devices_config" + check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools" + check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools_nodes" + check_empty_table "${daemon_dir}/database/global/db.bin" "storage_pools_config" + check_empty_table "${daemon_dir}/database/global/db.bin" "storage_volumes" + check_empty_table "${daemon_dir}/database/global/db.bin" "storage_volumes_config" + fi fi # teardown storage diff --git a/test/suites/basic.sh b/test/suites/basic.sh index 2fd97141a..dec56abbd 100644 --- a/test/suites/basic.sh +++ b/test/suites/basic.sh @@ -259,6 +259,12 @@ test_basic_usage() { lxc init testimage autostart --force-local lxd activateifneeded --debug 2>&1 | grep -q -v "activating..." lxc config set autostart boot.autostart true --force-local + + # Restart the daemon, this forces the global database to be dumped to disk. + shutdown_lxd "${LXD_DIR}" + respawn_lxd "${LXD_DIR}" true + lxc stop --force autostart + lxd activateifneeded --debug 2>&1 | grep -q "Daemon has auto-started containers, activating..." lxc config unset autostart boot.autostart --force-local diff --git a/test/suites/clustering.sh b/test/suites/clustering.sh index ab8debbb9..28bd38913 100644 --- a/test/suites/clustering.sh +++ b/test/suites/clustering.sh @@ -138,14 +138,14 @@ test_clustering_membership() { # Rename a node using the pre-existing name. LXD_DIR="${LXD_ONE_DIR}" lxc cluster rename node4 node3 - # Trying to delete a container which is the only one with a copy of + # Trying to delete a node which is the only one with a copy of # an image results in an error LXD_DIR="${LXD_FOUR_DIR}" ensure_import_testimage ! LXD_DIR="${LXD_FOUR_DIR}" lxc cluster remove node3 LXD_DIR="${LXD_TWO_DIR}" lxc image delete testimage # Remove a node gracefully. - LXD_DIR="${LXD_FOUR_DIR}" lxc cluster remove node3 + LXD_DIR="${LXD_ONE_DIR}" lxc cluster remove node3 ! LXD_DIR="${LXD_FOUR_DIR}" lxc cluster list LXD_DIR="${LXD_FIVE_DIR}" lxd shutdown From 34db0d03a571c8d61fb88c9d65561cd8e80bc241 Mon Sep 17 00:00:00 2001 From: Free Ekanayaka <free.ekanay...@canonical.com> Date: Tue, 17 Jul 2018 19:53:51 +0000 Subject: [PATCH 6/7] Drop raft snapshot workaround Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com> --- lxd/cluster/raft.go | 30 ++---------------------------- 1 file changed, 2 insertions(+), 28 deletions(-) diff --git a/lxd/cluster/raft.go b/lxd/cluster/raft.go index ca9fc4b32..bbb4074aa 100644 --- a/lxd/cluster/raft.go +++ b/lxd/cluster/raft.go @@ -275,30 +275,8 @@ func (i *raftInstance) Shutdown() error { // Invoke raft APIs asynchronously to allow for a timeout. timeout := 10 * time.Second - // FIXME/TODO: We take a snapshot before when shutting down the daemon - // so there will be no uncompacted raft logs at the next - // startup. This is a workaround for slow log replay when - // the LXD daemon starts (see #4485). A more proper fix - // should be probably implemented in dqlite. errCh := make(chan error) timer := time.After(timeout) - go func() { - //errCh <- i.raft.Snapshot().Error() - errCh <- nil - }() - // In case of error we just log a warning, since this is not really - // fatal. - select { - case err := <-errCh: - if err != nil && err != raft.ErrNothingNewToSnapshot { - logger.Warnf("Failed to take raft snapshot: %v", err) - } - case <-timer: - logger.Warnf("Timeout waiting for raft to take a snapshot") - } - - errCh = make(chan error) - timer = time.After(timeout) go func() { errCh <- i.raft.Shutdown().Error() }() @@ -406,12 +384,8 @@ func raftConfig(latency float64) *raft.Config { scale(duration) } - // FIXME/TODO: We increase the frequency of snapshots here to keep the - // number of uncompacted raft logs low, and workaround slow - // log replay when the LXD daemon starts (see #4485). A more - // proper fix should be probably implemented in dqlite. - config.SnapshotThreshold = 512 - config.TrailingLogs = 128 + config.SnapshotThreshold = 1024 + config.TrailingLogs = 512 return config } From 8107e292f2caf4ee1c1c6b3b646e358c520ccc72 Mon Sep 17 00:00:00 2001 From: Free Ekanayaka <free.ekanay...@canonical.com> Date: Tue, 17 Jul 2018 19:58:13 +0000 Subject: [PATCH 7/7] Fetch containers info in parallel Signed-off-by: Free Ekanayaka <free.ekanay...@canonical.com> --- lxd/containers_get.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lxd/containers_get.go b/lxd/containers_get.go index a54c8964d..3a4fe94e4 100644 --- a/lxd/containers_get.go +++ b/lxd/containers_get.go @@ -80,6 +80,7 @@ func doContainersGet(d *Daemon, r *http.Request) (interface{}, error) { resultMu.Unlock() } + wg := sync.WaitGroup{} for address, containers := range result { // If this is an internal request from another cluster node, // ignore containers from other nodes, and return only the ones @@ -100,7 +101,9 @@ func doContainersGet(d *Daemon, r *http.Request) (interface{}, error) { // For recursion requests we need to fetch the state of remote // containers from their respective nodes. if recursion && address != "" && !isClusterNotification(r) { - func(address string, containers []string) { + wg.Add(1) + go func(address string, containers []string) { + defer wg.Done() cert := d.endpoints.NetworkCert() cs, err := doContainersGetFromNode(address, cert) @@ -135,6 +138,7 @@ func doContainersGet(d *Daemon, r *http.Request) (interface{}, error) { } } } + wg.Wait() if !recursion { return resultString, nil
_______________________________________________ lxc-devel mailing list lxc-devel@lists.linuxcontainers.org http://lists.linuxcontainers.org/listinfo/lxc-devel