This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new cd25639 feat(storage-node): unregister storage unit when storage node
be deleted
new c9ba968 Merge pull request #373 from Xu-Wentao/storage-node
cd25639 is described below
commit cd25639cd8c7b29ca9a978b3b18b7ea2026a440f
Author: xuwentao <[email protected]>
AuthorDate: Thu May 18 19:41:12 2023 +0800
feat(storage-node): unregister storage unit when storage node be deleted
---
.../controllers/storage_ndoe_controller_test.go | 76 ++++++++++++++++++++++
.../pkg/controllers/storage_node_controller.go | 32 +++++++++
.../pkg/shardingsphere/shardingsphere.go | 26 +-------
.../test/e2e/storage_node_controller_test.go | 42 ++++++++++--
4 files changed, 145 insertions(+), 31 deletions(-)
diff --git
a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index 393c00c..04f01f1 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -645,5 +645,81 @@ var _ = Describe("StorageNode Controller Mock Test",
func() {
Expect(reconciler.registerStorageUnit(ctx,
sn)).To(BeNil())
Expect(sn.Status.Registered).To(BeTrue())
})
+
+ Context("Test unregisterStorageUnit", func() {
+ BeforeEach(func() {
+ mockCtrl = gomock.NewController(GinkgoT())
+ mockSS =
mock_shardingsphere.NewMockIServer(mockCtrl)
+ monkey.Patch(shardingsphere.NewServer, func(_,
_ string, _ uint, _, _ string) (shardingsphere.IServer, error) {
+ return mockSS, nil
+ })
+ })
+ AfterEach(func() {
+ mockCtrl.Finish()
+ monkey.UnpatchAll()
+ })
+ It("should be successful when unregister storage unit",
func() {
+ testName := "test-unregister-storage-unit"
+
+ cn := &v1alpha1.ComputeNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testName,
+ Namespace: defaultTestNamespace,
+ },
+ Spec: v1alpha1.ComputeNodeSpec{
+ Bootstrap:
v1alpha1.BootstrapConfig{
+ ServerConfig:
v1alpha1.ServerConfig{
+ Authority:
v1alpha1.ComputeNodeAuthority{
+ Users:
[]v1alpha1.ComputeNodeUser{
+
{
+
User: "root",
+
Password: "root",
+
},
+ },
+ },
+ },
+ },
+ },
+ }
+ svc := &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testName,
+ Namespace: defaultTestNamespace,
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {
+ Name:
"http",
+ Protocol: "TCP",
+ Port: 3307,
+ },
+ },
+ },
+ }
+ Expect(fakeClient.Create(ctx,
cn)).Should(Succeed())
+ Expect(fakeClient.Create(ctx,
svc)).Should(Succeed())
+
+ sn := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: testName,
+ Namespace: defaultTestNamespace,
+ Annotations: map[string]string{
+
AnnotationKeyLogicDatabaseName: testName,
+
dbmeshv1alpha1.AnnotationsInstanceDBName: testName,
+
AnnotationKeyComputeNodeName: testName,
+
AnnotationKeyComputeNodeNamespace: defaultTestNamespace,
+ },
+ },
+ Status: v1alpha1.StorageNodeStatus{
+ Registered: true,
+ },
+ }
+ Expect(fakeClient.Create(ctx,
sn)).Should(Succeed())
+
+
mockSS.EXPECT().UnRegisterStorageUnit(gomock.Any()).Return(nil)
+ mockSS.EXPECT().Close().Return(nil)
+ Expect(reconciler.unregisterStorageUnit(ctx,
sn)).To(BeNil())
+ })
+ })
})
})
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index 1628e37..b010b8f 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -118,6 +118,12 @@ func (r *StorageNodeReconciler) finalize(ctx
context.Context, node *v1alpha1.Sto
return ctrl.Result{}, nil
}
+ // Try to unregister storage unit in shardingsphere.
+ if err = r.unregisterStorageUnit(ctx, node); err != nil {
+ r.Log.Error(err, "failed to delete storage unit")
+ return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
+ }
+
if err = r.deleteDatabaseCluster(ctx, node, databaseClass); err != nil {
r.Log.Error(err, "failed to delete database cluster")
return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
@@ -506,6 +512,32 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx
context.Context, node *v
return nil
}
+func (r *StorageNodeReconciler) unregisterStorageUnit(ctx context.Context,
node *v1alpha1.StorageNode) error {
+ if !node.Status.Registered {
+ return nil
+ }
+ if err := r.validateComputeNodeAnnotations(node); err != nil {
+ return err
+ }
+
+ ssServer, err := r.getShardingsphereServer(ctx, node)
+ if err != nil {
+ return fmt.Errorf("getShardingsphereServer failed: %w", err)
+ }
+
+ defer ssServer.Close()
+
+ // TODO how to set ds name?
+ if err := ssServer.UnRegisterStorageUnit("ds_0"); err != nil {
+ return fmt.Errorf("unregister storage unit failed: %w", err)
+ }
+
+ r.Recorder.Eventf(node, corev1.EventTypeNormal,
"StorageUnitUnRegistered", "StorageUnit of node %s/%s is unregistered",
node.GetNamespace(), node.GetName())
+
+ node.Status.Registered = false
+ return nil
+}
+
func (r *StorageNodeReconciler) validateComputeNodeAnnotations(node
*v1alpha1.StorageNode) error {
requiredAnnos := []string{
AnnotationKeyLogicDatabaseName,
diff --git a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
index 746689a..9b2a0d9 100644
--- a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
@@ -39,9 +39,7 @@ const (
DistSQLDropTable = `DROP TABLE %s;`
)
-var (
- ruleTypeMap = map[string]string{}
-)
+var ruleTypeMap = map[string]string{}
type Rule struct {
Type string
@@ -142,22 +140,11 @@ func (s *server) UnRegisterStorageUnit(dsName string)
error {
return fmt.Errorf("get rules used error: %w", err)
}
- // TODO DISCUSS: should we drop all tables used by storage unit?
- // clean all rules and tables used by storage unit
- tables := map[string]struct{}{}
+ // clean all rules used by storage unit
for _, rule := range rules {
if err := s.dropRule(rule.Type, rule.Name); err != nil {
return fmt.Errorf("drop rule error: %w", err)
}
- if _, ok := tables[rule.Name]; !ok {
- tables[rule.Name] = struct{}{}
- }
- }
-
- for table := range tables {
- if err := s.dropTable(table); err != nil {
- return fmt.Errorf("drop table error: %w", err)
- }
}
distSQL := fmt.Sprintf(DistSQLUnRegisterStorageUnit, dsName)
@@ -181,15 +168,6 @@ func (s *server) dropRule(ruleType, ruleName string) error
{
return nil
}
-func (s *server) dropTable(tableName string) error {
- distSQL := fmt.Sprintf(DistSQLDropTable, tableName)
- _, err := s.db.Exec(distSQL)
- if err != nil {
- return fmt.Errorf("drop table fail, err: %s", err)
- }
- return nil
-}
-
func init() {
// init rule type map
// implement more rule type if needed
diff --git a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
index 255ec71..0c1a681 100644
--- a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
+++ b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
@@ -21,6 +21,7 @@ import (
"context"
"database/sql"
"github.com/DATA-DOG/go-sqlmock"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
"reflect"
"regexp"
"time"
@@ -145,15 +146,13 @@ var _ = Describe("StorageNode Controller Suite Test",
func() {
}, 10*time.Second, 1*time.Second).Should(BeTrue())
})
- It("should register storage unit success", func() {
+ It("should register and unregister storage unit success",
func() {
// mock mysql
db, dbmock, err := sqlmock.New()
Expect(err).Should(Succeed())
Expect(dbmock).ShouldNot(BeNil())
- defer db.Close()
-
// mock rds DescribeDBInstances func returns success
-
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance",
func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode)
(*dbmesh_rds.DescInstance, error) {
+ g :=
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance",
func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode)
(*dbmesh_rds.DescInstance, error) {
return &dbmesh_rds.DescInstance{
DBInstanceStatus:
dbmesh_rds.DBInstanceStatusAvailable,
Endpoint: dbmesh_rds.Endpoint{
@@ -162,9 +161,15 @@ var _ = Describe("StorageNode Controller Suite Test",
func() {
},
}, nil
})
+
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "DeleteInstance",
func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode, _
*dbmeshv1alpha1.DatabaseClass) error {
+ return nil
+ })
monkey.Patch(sql.Open, func(_ string, _ string)
(*sql.DB, error) {
return db, nil
})
+ monkey.PatchInstanceMethod(reflect.TypeOf(db), "Close",
func(_ *sql.DB) error {
+ return nil
+ })
cn := &v1alpha1.ComputeNode{
ObjectMeta: metav1.ObjectMeta{
@@ -238,20 +243,43 @@ var _ = Describe("StorageNode Controller Suite Test",
func() {
Expect(k8sClient.Create(ctx, cn)).Should(Succeed())
Expect(k8sClient.Create(ctx, node)).Should(Succeed())
- dbmock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF
NOT EXISTS")).WillReturnResult(sqlmock.NewResult(0, 0))
+ dbmock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF
NOT EXISTS")).WillReturnResult(sqlmock.NewResult(1, 1))
dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE
UNIT IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(0, 0))
Eventually(func() v1alpha1.StorageNodePhaseStatus {
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx,
client.ObjectKey{Name: nodeName, Namespace: "default"},
newSN)).Should(Succeed())
return newSN.Status.Phase
- }, 10, 1).Should(Equal(v1alpha1.StorageNodePhaseReady))
+ }, 20, 2).Should(Equal(v1alpha1.StorageNodePhaseReady))
Eventually(func() bool {
newSN := &v1alpha1.StorageNode{}
Expect(k8sClient.Get(ctx,
client.ObjectKey{Name: nodeName, Namespace: "default"},
newSN)).Should(Succeed())
return newSN.Status.Registered
- }, 10, 1).Should(BeTrue())
+ }, 20, 2).Should(BeTrue())
+
+ // delete storage node
+ Expect(k8sClient.Delete(ctx, node)).Should(Succeed())
+
+ dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED
STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type",
"name"}).AddRow("sharding", "t_order"))
+ dbmock.ExpectExec("DROP SHARDING TABLE
RULE").WillReturnResult(sqlmock.NewResult(1, 1))
+ dbmock.ExpectExec(regexp.QuoteMeta("UNREGISTER STORAGE
UNIT")).WillReturnResult(sqlmock.NewResult(0, 0))
+ Eventually(func() v1alpha1.StorageNodePhaseStatus {
+ newSN := &v1alpha1.StorageNode{}
+ Expect(k8sClient.Get(ctx,
client.ObjectKey{Name: nodeName, Namespace: "default"},
newSN)).Should(Succeed())
+ return newSN.Status.Phase
+ }, 20,
2).Should(Equal(v1alpha1.StorageNodePhaseDeleting))
+
+ g.Unpatch()
+
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance",
func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode)
(*dbmesh_rds.DescInstance, error) {
+ return nil, nil
+ })
+
+ Eventually(func() bool {
+ newSN := &v1alpha1.StorageNode{}
+ err := k8sClient.Get(ctx,
client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)
+ return apierrors.IsNotFound(err)
+ }, 20, 2).Should(BeTrue())
})
})
})