This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new cd25639  feat(storage-node): unregister storage unit when storage node 
be deleted
     new c9ba968  Merge pull request #373 from Xu-Wentao/storage-node
cd25639 is described below

commit cd25639cd8c7b29ca9a978b3b18b7ea2026a440f
Author: xuwentao <[email protected]>
AuthorDate: Thu May 18 19:41:12 2023 +0800

    feat(storage-node): unregister storage unit when storage node be deleted
---
 .../controllers/storage_ndoe_controller_test.go    | 76 ++++++++++++++++++++++
 .../pkg/controllers/storage_node_controller.go     | 32 +++++++++
 .../pkg/shardingsphere/shardingsphere.go           | 26 +-------
 .../test/e2e/storage_node_controller_test.go       | 42 ++++++++++--
 4 files changed, 145 insertions(+), 31 deletions(-)

diff --git 
a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go 
b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index 393c00c..04f01f1 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -645,5 +645,81 @@ var _ = Describe("StorageNode Controller Mock Test", 
func() {
                        Expect(reconciler.registerStorageUnit(ctx, 
sn)).To(BeNil())
                        Expect(sn.Status.Registered).To(BeTrue())
                })
+
+               Context("Test unregisterStorageUnit", func() {
+                       BeforeEach(func() {
+                               mockCtrl = gomock.NewController(GinkgoT())
+                               mockSS = 
mock_shardingsphere.NewMockIServer(mockCtrl)
+                               monkey.Patch(shardingsphere.NewServer, func(_, 
_ string, _ uint, _, _ string) (shardingsphere.IServer, error) {
+                                       return mockSS, nil
+                               })
+                       })
+                       AfterEach(func() {
+                               mockCtrl.Finish()
+                               monkey.UnpatchAll()
+                       })
+                       It("should be successful when unregister storage unit", 
func() {
+                               testName := "test-unregister-storage-unit"
+
+                               cn := &v1alpha1.ComputeNode{
+                                       ObjectMeta: metav1.ObjectMeta{
+                                               Name:      testName,
+                                               Namespace: defaultTestNamespace,
+                                       },
+                                       Spec: v1alpha1.ComputeNodeSpec{
+                                               Bootstrap: 
v1alpha1.BootstrapConfig{
+                                                       ServerConfig: 
v1alpha1.ServerConfig{
+                                                               Authority: 
v1alpha1.ComputeNodeAuthority{
+                                                                       Users: 
[]v1alpha1.ComputeNodeUser{
+                                                                               
{
+                                                                               
        User:     "root",
+                                                                               
        Password: "root",
+                                                                               
},
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                               }
+                               svc := &corev1.Service{
+                                       ObjectMeta: metav1.ObjectMeta{
+                                               Name:      testName,
+                                               Namespace: defaultTestNamespace,
+                                       },
+                                       Spec: corev1.ServiceSpec{
+                                               Ports: []corev1.ServicePort{
+                                                       {
+                                                               Name:     
"http",
+                                                               Protocol: "TCP",
+                                                               Port:     3307,
+                                                       },
+                                               },
+                                       },
+                               }
+                               Expect(fakeClient.Create(ctx, 
cn)).Should(Succeed())
+                               Expect(fakeClient.Create(ctx, 
svc)).Should(Succeed())
+
+                               sn := &v1alpha1.StorageNode{
+                                       ObjectMeta: metav1.ObjectMeta{
+                                               Name:      testName,
+                                               Namespace: defaultTestNamespace,
+                                               Annotations: map[string]string{
+                                                       
AnnotationKeyLogicDatabaseName:           testName,
+                                                       
dbmeshv1alpha1.AnnotationsInstanceDBName: testName,
+                                                       
AnnotationKeyComputeNodeName:             testName,
+                                                       
AnnotationKeyComputeNodeNamespace:        defaultTestNamespace,
+                                               },
+                                       },
+                                       Status: v1alpha1.StorageNodeStatus{
+                                               Registered: true,
+                                       },
+                               }
+                               Expect(fakeClient.Create(ctx, 
sn)).Should(Succeed())
+
+                               
mockSS.EXPECT().UnRegisterStorageUnit(gomock.Any()).Return(nil)
+                               mockSS.EXPECT().Close().Return(nil)
+                               Expect(reconciler.unregisterStorageUnit(ctx, 
sn)).To(BeNil())
+                       })
+               })
        })
 })
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go 
b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index 1628e37..b010b8f 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -118,6 +118,12 @@ func (r *StorageNodeReconciler) finalize(ctx 
context.Context, node *v1alpha1.Sto
                return ctrl.Result{}, nil
        }
 
+       // Try to unregister storage unit in shardingsphere.
+       if err = r.unregisterStorageUnit(ctx, node); err != nil {
+               r.Log.Error(err, "failed to delete storage unit")
+               return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
+       }
+
        if err = r.deleteDatabaseCluster(ctx, node, databaseClass); err != nil {
                r.Log.Error(err, "failed to delete database cluster")
                return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
@@ -506,6 +512,32 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx 
context.Context, node *v
        return nil
 }
 
+func (r *StorageNodeReconciler) unregisterStorageUnit(ctx context.Context, 
node *v1alpha1.StorageNode) error {
+       if !node.Status.Registered {
+               return nil
+       }
+       if err := r.validateComputeNodeAnnotations(node); err != nil {
+               return err
+       }
+
+       ssServer, err := r.getShardingsphereServer(ctx, node)
+       if err != nil {
+               return fmt.Errorf("getShardingsphereServer failed: %w", err)
+       }
+
+       defer ssServer.Close()
+
+       // TODO how to set ds name?
+       if err := ssServer.UnRegisterStorageUnit("ds_0"); err != nil {
+               return fmt.Errorf("unregister storage unit failed: %w", err)
+       }
+
+       r.Recorder.Eventf(node, corev1.EventTypeNormal, 
"StorageUnitUnRegistered", "StorageUnit of node %s/%s is unregistered", 
node.GetNamespace(), node.GetName())
+
+       node.Status.Registered = false
+       return nil
+}
+
 func (r *StorageNodeReconciler) validateComputeNodeAnnotations(node 
*v1alpha1.StorageNode) error {
        requiredAnnos := []string{
                AnnotationKeyLogicDatabaseName,
diff --git a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go 
b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
index 746689a..9b2a0d9 100644
--- a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
@@ -39,9 +39,7 @@ const (
        DistSQLDropTable = `DROP TABLE %s;`
 )
 
-var (
-       ruleTypeMap = map[string]string{}
-)
+var ruleTypeMap = map[string]string{}
 
 type Rule struct {
        Type string
@@ -142,22 +140,11 @@ func (s *server) UnRegisterStorageUnit(dsName string) 
error {
                return fmt.Errorf("get rules used error: %w", err)
        }
 
-       // TODO DISCUSS: should we drop all tables used by storage unit?
-       // clean all rules and tables used by storage unit
-       tables := map[string]struct{}{}
+       // clean all rules used by storage unit
        for _, rule := range rules {
                if err := s.dropRule(rule.Type, rule.Name); err != nil {
                        return fmt.Errorf("drop rule error: %w", err)
                }
-               if _, ok := tables[rule.Name]; !ok {
-                       tables[rule.Name] = struct{}{}
-               }
-       }
-
-       for table := range tables {
-               if err := s.dropTable(table); err != nil {
-                       return fmt.Errorf("drop table error: %w", err)
-               }
        }
 
        distSQL := fmt.Sprintf(DistSQLUnRegisterStorageUnit, dsName)
@@ -181,15 +168,6 @@ func (s *server) dropRule(ruleType, ruleName string) error 
{
        return nil
 }
 
-func (s *server) dropTable(tableName string) error {
-       distSQL := fmt.Sprintf(DistSQLDropTable, tableName)
-       _, err := s.db.Exec(distSQL)
-       if err != nil {
-               return fmt.Errorf("drop table fail, err: %s", err)
-       }
-       return nil
-}
-
 func init() {
        // init rule type map
        // implement more rule type if needed
diff --git a/shardingsphere-operator/test/e2e/storage_node_controller_test.go 
b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
index 255ec71..0c1a681 100644
--- a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
+++ b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
@@ -21,6 +21,7 @@ import (
        "context"
        "database/sql"
        "github.com/DATA-DOG/go-sqlmock"
+       apierrors "k8s.io/apimachinery/pkg/api/errors"
        "reflect"
        "regexp"
        "time"
@@ -145,15 +146,13 @@ var _ = Describe("StorageNode Controller Suite Test", 
func() {
                        }, 10*time.Second, 1*time.Second).Should(BeTrue())
                })
 
-               It("should register storage unit success", func() {
+               It("should register and unregister storage unit success", 
func() {
                        // mock mysql
                        db, dbmock, err := sqlmock.New()
                        Expect(err).Should(Succeed())
                        Expect(dbmock).ShouldNot(BeNil())
-                       defer db.Close()
-
                        // mock rds DescribeDBInstances func returns success
-                       
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", 
func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) 
(*dbmesh_rds.DescInstance, error) {
+                       g := 
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", 
func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) 
(*dbmesh_rds.DescInstance, error) {
                                return &dbmesh_rds.DescInstance{
                                        DBInstanceStatus: 
dbmesh_rds.DBInstanceStatusAvailable,
                                        Endpoint: dbmesh_rds.Endpoint{
@@ -162,9 +161,15 @@ var _ = Describe("StorageNode Controller Suite Test", 
func() {
                                        },
                                }, nil
                        })
+                       
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "DeleteInstance", 
func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode, _ 
*dbmeshv1alpha1.DatabaseClass) error {
+                               return nil
+                       })
                        monkey.Patch(sql.Open, func(_ string, _ string) 
(*sql.DB, error) {
                                return db, nil
                        })
+                       monkey.PatchInstanceMethod(reflect.TypeOf(db), "Close", 
func(_ *sql.DB) error {
+                               return nil
+                       })
 
                        cn := &v1alpha1.ComputeNode{
                                ObjectMeta: metav1.ObjectMeta{
@@ -238,20 +243,43 @@ var _ = Describe("StorageNode Controller Suite Test", 
func() {
                        Expect(k8sClient.Create(ctx, cn)).Should(Succeed())
                        Expect(k8sClient.Create(ctx, node)).Should(Succeed())
 
-                       dbmock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF 
NOT EXISTS")).WillReturnResult(sqlmock.NewResult(0, 0))
+                       dbmock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF 
NOT EXISTS")).WillReturnResult(sqlmock.NewResult(1, 1))
                        dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE 
UNIT IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(0, 0))
 
                        Eventually(func() v1alpha1.StorageNodePhaseStatus {
                                newSN := &v1alpha1.StorageNode{}
                                Expect(k8sClient.Get(ctx, 
client.ObjectKey{Name: nodeName, Namespace: "default"}, 
newSN)).Should(Succeed())
                                return newSN.Status.Phase
-                       }, 10, 1).Should(Equal(v1alpha1.StorageNodePhaseReady))
+                       }, 20, 2).Should(Equal(v1alpha1.StorageNodePhaseReady))
 
                        Eventually(func() bool {
                                newSN := &v1alpha1.StorageNode{}
                                Expect(k8sClient.Get(ctx, 
client.ObjectKey{Name: nodeName, Namespace: "default"}, 
newSN)).Should(Succeed())
                                return newSN.Status.Registered
-                       }, 10, 1).Should(BeTrue())
+                       }, 20, 2).Should(BeTrue())
+
+                       // delete storage node
+                       Expect(k8sClient.Delete(ctx, node)).Should(Succeed())
+
+                       dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED 
STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type", 
"name"}).AddRow("sharding", "t_order"))
+                       dbmock.ExpectExec("DROP SHARDING TABLE 
RULE").WillReturnResult(sqlmock.NewResult(1, 1))
+                       dbmock.ExpectExec(regexp.QuoteMeta("UNREGISTER STORAGE 
UNIT")).WillReturnResult(sqlmock.NewResult(0, 0))
+                       Eventually(func() v1alpha1.StorageNodePhaseStatus {
+                               newSN := &v1alpha1.StorageNode{}
+                               Expect(k8sClient.Get(ctx, 
client.ObjectKey{Name: nodeName, Namespace: "default"}, 
newSN)).Should(Succeed())
+                               return newSN.Status.Phase
+                       }, 20, 
2).Should(Equal(v1alpha1.StorageNodePhaseDeleting))
+
+                       g.Unpatch()
+                       
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), "GetInstance", 
func(_ *aws.RdsClient, _ context.Context, _ *v1alpha1.StorageNode) 
(*dbmesh_rds.DescInstance, error) {
+                               return nil, nil
+                       })
+
+                       Eventually(func() bool {
+                               newSN := &v1alpha1.StorageNode{}
+                               err := k8sClient.Get(ctx, 
client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)
+                               return apierrors.IsNotFound(err)
+                       }, 20, 2).Should(BeTrue())
                })
        })
 })

Reply via email to