This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new bd4c5bf  feat: storage node support aws rds cluster
     new 4656bf7  Merge pull request #402 from Xu-Wentao/sn-aws-rds-cluster
bd4c5bf is described below

commit bd4c5bf83f9274238dcc1c92cc786eb630a22f7a
Author: xuwentao <[email protected]>
AuthorDate: Wed Jun 7 20:27:23 2023 +0800

    feat: storage node support aws rds cluster
---
 .../api/v1alpha1/storage_node_types.go             |   7 +-
 .../api/v1alpha1/storageprovider_types.go          |  19 +-
 shardingsphere-operator/go.mod                     |   2 +-
 shardingsphere-operator/go.sum                     |  10 +-
 .../controllers/storage_ndoe_controller_test.go    | 464 ++++++++++++++++++++-
 .../pkg/controllers/storage_node_controller.go     |  73 +++-
 .../pkg/reconcile/storagenode/aws/aurora.go        |   2 +-
 .../pkg/reconcile/storagenode/aws/aws.go           |   4 +
 .../pkg/reconcile/storagenode/aws/mocks/aws.go     |  43 ++
 .../pkg/reconcile/storagenode/aws/rdscluster.go    | 197 +++++++++
 .../test/e2e/storage_node_controller_test.go       |   6 +-
 11 files changed, 792 insertions(+), 35 deletions(-)

diff --git a/shardingsphere-operator/api/v1alpha1/storage_node_types.go 
b/shardingsphere-operator/api/v1alpha1/storage_node_types.go
index e92d78a..1dc1d97 100644
--- a/shardingsphere-operator/api/v1alpha1/storage_node_types.go
+++ b/shardingsphere-operator/api/v1alpha1/storage_node_types.go
@@ -120,10 +120,9 @@ type StorageNodeSpec struct {
        // if not set, will NOT create database
        Schema string `json:"schema"`
        // +optional
-       // only for cluster provider like AWS RDS Cluster/ AWS Aurora Cluster
-       // The Default value is 1 for cluster provider
-       // will not be effective for single instance, instance will always be 1 
for single instance
-       // Example: 2, means 2 instances in the cluster(1 primary + 1 reader)
+       // Only for aws aurora storage provider right now. And the default 
value is 1.
+       // aws rds instance is always 1.
+       // aws rds cluster will auto create 3 instances(1 primary and 2 
replicas).
        // +kubebuilder:default=1
        Replicas int32 `json:"replicas"`
 }
diff --git a/shardingsphere-operator/api/v1alpha1/storageprovider_types.go 
b/shardingsphere-operator/api/v1alpha1/storageprovider_types.go
index 8cd579f..02cb15f 100644
--- a/shardingsphere-operator/api/v1alpha1/storageprovider_types.go
+++ b/shardingsphere-operator/api/v1alpha1/storageprovider_types.go
@@ -25,15 +25,16 @@ import (
 // NOTE: json tags are required.  Any new fields you add must have json tags 
for the fields to be serialized.
 
 const (
-       AnnotationsVPCSecurityGroupIds = 
"storageproviders.shardingsphere.apache.org/vpc-security-group-ids"
-       AnnotationsSubnetGroupName     = 
"storageproviders.shardingsphere.apache.org/vpc-subnet-group-name"
-       AnnotationsAvailabilityZones   = 
"storageproviders.shardingsphere.apache.org/availability-zones"
-       AnnotationsClusterIdentifier   = 
"storageproviders.shardingsphere.apache.org/cluster-identifier"
-       AnnotationsInstanceIdentifier  = 
"storageproviders.shardingsphere.apache.org/instance-identifier"
-       AnnotationsInstanceDBName      = 
"storageproviders.shardingsphere.apache.org/instance-db-name"
-       AnnotationsSnapshotIdentifier  = 
"storageproviders.shardingsphere.apache.org/snapshot-identifier"
-       AnnotationsMasterUsername      = 
"storageproviders.shardingsphere.apache.org/master-username"
-       AnnotationsMasterUserPassword  = 
"storageproviders.shardingsphere.apache.org/master-user-password"
+       AnnotationsVPCSecurityGroupIds     = 
"storageproviders.shardingsphere.apache.org/vpc-security-group-ids"
+       AnnotationsSubnetGroupName         = 
"storageproviders.shardingsphere.apache.org/vpc-subnet-group-name"
+       AnnotationsAvailabilityZones       = 
"storageproviders.shardingsphere.apache.org/availability-zones"
+       AnnotationsClusterIdentifier       = 
"storageproviders.shardingsphere.apache.org/cluster-identifier"
+       AnnotationsInstanceIdentifier      = 
"storageproviders.shardingsphere.apache.org/instance-identifier"
+       AnnotationsInstanceDBName          = 
"storageproviders.shardingsphere.apache.org/instance-db-name"
+       AnnotationsSnapshotIdentifier      = 
"storageproviders.shardingsphere.apache.org/snapshot-identifier"
+       AnnotationsMasterUsername          = 
"storageproviders.shardingsphere.apache.org/master-username"
+       AnnotationsMasterUserPassword      = 
"storageproviders.shardingsphere.apache.org/master-user-password"
+       AnnotationsFinalSnapshotIdentifier = 
"storageproviders.shardingsphere.apache.org/final-snapshot-identifier"
 
        ProvisionerAWSRDSInstance = 
"storageproviders.shardingsphere.apache.org/aws-rds-instance"
        ProvisionerAWSRDSCluster  = 
"storageproviders.shardingsphere.apache.org/aws-rds-cluster"
diff --git a/shardingsphere-operator/go.mod b/shardingsphere-operator/go.mod
index e321341..277acdc 100644
--- a/shardingsphere-operator/go.mod
+++ b/shardingsphere-operator/go.mod
@@ -8,7 +8,7 @@ require (
        github.com/antlr/antlr4 v0.0.0-20181218183524-be58ebffde8e
        github.com/chaos-mesh/chaos-mesh/api v0.0.0-20230517110555-afab5b4a7813
        github.com/cloudnative-pg/cloudnative-pg v1.20.0
-       github.com/database-mesh/golang-sdk v0.0.0-20230607082802-81e878c57517
+       github.com/database-mesh/golang-sdk v0.0.0-20230608051131-717115b848ac
        github.com/go-logr/logr v1.2.4
        github.com/go-sql-driver/mysql v1.7.1
        github.com/golang/mock v1.6.0
diff --git a/shardingsphere-operator/go.sum b/shardingsphere-operator/go.sum
index 905c132..06ec0b2 100644
--- a/shardingsphere-operator/go.sum
+++ b/shardingsphere-operator/go.sum
@@ -64,12 +64,10 @@ github.com/cncf/udpa/go 
v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
 github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod 
h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
 github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod 
h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
 github.com/creack/pty v1.1.9/go.mod 
h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
-github.com/database-mesh/golang-sdk v0.0.0-20230606100535-23037381e4fb 
h1:p3tpHo24HjA7rW/JMjD9/6klWAVMn4fIefHXKgggVAg=
-github.com/database-mesh/golang-sdk v0.0.0-20230606100535-23037381e4fb/go.mod 
h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
-github.com/database-mesh/golang-sdk v0.0.0-20230607065347-ac17e15902a6 
h1:qm4zHib+RLX02dX+V8jpY5Zti0GBBZxoIBEceu/AuCQ=
-github.com/database-mesh/golang-sdk v0.0.0-20230607065347-ac17e15902a6/go.mod 
h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
-github.com/database-mesh/golang-sdk v0.0.0-20230607082802-81e878c57517 
h1:qS2ZfFpV+Vq3um8WYxWkgTpuoYT9ki63H2tA/Tb9SWw=
-github.com/database-mesh/golang-sdk v0.0.0-20230607082802-81e878c57517/go.mod 
h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
+github.com/database-mesh/golang-sdk v0.0.0-20230608033816-2b0eafd08bb8 
h1:ykLnu04rJx6qLn4QL0XcWpL3AoIGYDYLqRX7qpPB+Rw=
+github.com/database-mesh/golang-sdk v0.0.0-20230608033816-2b0eafd08bb8/go.mod 
h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
+github.com/database-mesh/golang-sdk v0.0.0-20230608051131-717115b848ac 
h1:XxVNCAoMMSaDgKrPKV6jhXsknsXnHJ9PphpE/0DQldY=
+github.com/database-mesh/golang-sdk v0.0.0-20230608051131-717115b848ac/go.mod 
h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
diff --git 
a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go 
b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index fadd47a..f3632a5 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -799,7 +799,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS 
Aurora", func() {
                                PrimaryEndpoint:     
"test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
                                ReaderEndpoint:      
"test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
                                Port:                3306,
-                               Status:              
dbmesh_rds.DBClusterStatusAvailable,
+                               Status:              
string(dbmesh_rds.DBClusterStatusAvailable),
                        }
                        descInstance := &dbmesh_rds.DescInstance{
                                DBInstanceIdentifier: "test-aws-aurora-1",
@@ -850,7 +850,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS 
Aurora", func() {
                                Status: v1alpha1.StorageNodeStatus{
                                        Phase: v1alpha1.StorageNodePhaseReady,
                                        Cluster: v1alpha1.ClusterStatus{
-                                               Status:          
dbmesh_rds.DBClusterStatusAvailable,
+                                               Status:          
string(dbmesh_rds.DBClusterStatusAvailable),
                                                PrimaryEndpoint: 
v1alpha1.Endpoint{Address: 
"test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
                                                ReaderEndpoints: 
[]v1alpha1.Endpoint{{Address: 
"test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
                                        },
@@ -870,7 +870,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS 
Aurora", func() {
                                PrimaryEndpoint:     
"test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
                                ReaderEndpoint:      
"test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
                                Port:                3306,
-                               Status:              
dbmesh_rds.DBClusterStatusAvailable,
+                               Status:              
string(dbmesh_rds.DBClusterStatusAvailable),
                        }
 
                        descInstance := &dbmesh_rds.DescInstance{
@@ -924,7 +924,7 @@ var _ = Describe("StorageNode Controller Mock Test For AWS 
Aurora", func() {
                                Status: v1alpha1.StorageNodeStatus{
                                        Phase: 
v1alpha1.StorageNodePhaseDeleting,
                                        Cluster: v1alpha1.ClusterStatus{
-                                               Status:          
dbmesh_rds.DBClusterStatusDeleting,
+                                               Status:          
string(dbmesh_rds.DBClusterStatusDeleting),
                                                PrimaryEndpoint: 
v1alpha1.Endpoint{Address: 
"test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
                                                ReaderEndpoints: 
[]v1alpha1.Endpoint{{Address: 
"test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
                                        },
@@ -1007,7 +1007,7 @@ var _ = Describe("StorageNode Controller Mock Test For 
AWS Aurora", func() {
                                Status: v1alpha1.StorageNodeStatus{
                                        Phase: v1alpha1.StorageNodePhaseReady,
                                        Cluster: v1alpha1.ClusterStatus{
-                                               Status:          
dbmesh_rds.DBClusterStatusAvailable,
+                                               Status:          
string(dbmesh_rds.DBClusterStatusAvailable),
                                                PrimaryEndpoint: 
v1alpha1.Endpoint{Address: 
"test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
                                                ReaderEndpoints: 
[]v1alpha1.Endpoint{{Address: 
"test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
                                        },
@@ -1056,7 +1056,7 @@ var _ = Describe("StorageNode Controller Mock Test For 
AWS Aurora", func() {
                                DBClusterIdentifier: "test-aws-aurora",
                                PrimaryEndpoint:     
"test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
                                Port:                int32(3306),
-                               Status:              
dbmesh_rds.DBClusterStatusAvailable,
+                               Status:              
string(dbmesh_rds.DBClusterStatusAvailable),
                        }, nil).Times(1)
                        // mock get instances of aws aurora are available
                        mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), 
gomock.Any()).Return([]*dbmesh_rds.DescInstance{
@@ -1145,7 +1145,7 @@ var _ = Describe("StorageNode Controller Mock Test For 
AWS Aurora", func() {
                                DBClusterIdentifier: "test-aws-aurora",
                                PrimaryEndpoint:     
"test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
                                Port:                int32(3306),
-                               Status:              
dbmesh_rds.DBClusterStatusAvailable,
+                               Status:              
string(dbmesh_rds.DBClusterStatusAvailable),
                                DBClusterMembers: []dbmesh_rds.ClusterMember{
                                        {DBInstanceIdentifier: 
"test-aws-aurora-instance-0"},
                                        {DBInstanceIdentifier: 
"test-aws-aurora-instance-1"},
@@ -1183,5 +1183,455 @@ var _ = Describe("StorageNode Controller Mock Test For 
AWS Aurora", func() {
                        Expect(storageNode.Status.Registered).To(BeFalse())
                })
        })
+})
+
+var _ = Describe("StorageNode Controller Mock Test For AWS RDS Cluster", 
func() {
+       var provider *v1alpha1.StorageProvider
+       var providerName = "aws-rds-cluster"
+       BeforeEach(func() {
+               provider = &v1alpha1.StorageProvider{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Name: providerName,
+                       },
+                       Spec: v1alpha1.StorageProviderSpec{
+                               Provisioner: v1alpha1.ProvisionerAWSRDSCluster,
+                               Parameters: map[string]string{
+                                       "engine":             "mysql",
+                                       "engineVersion":      "5.7",
+                                       "masterUsername":     "root",
+                                       "masterUserPassword": "root",
+                                       "allocatedStorage":   "20",
+                               },
+                       },
+               }
+               Expect(fakeClient.Create(ctx, provider)).Should(Succeed())
+
+               // mock aws client
+               // mock aws rds client
+               mockCtrl = gomock.NewController(GinkgoT())
+               mockAws = mock_aws.NewMockIRdsClient(mockCtrl)
+               monkey.Patch(aws.NewRdsClient, func(rds dbmesh_rds.RDS) 
aws.IRdsClient {
+                       return mockAws
+               })
+               mockSS = mock_shardingsphere.NewMockIServer(mockCtrl)
+               monkey.Patch(shardingsphere.NewServer, func(_, _ string, _ 
uint, _, _ string) (shardingsphere.IServer, error) {
+                       return mockSS, nil
+               })
+       })
+
+       AfterEach(func() {
+               mockCtrl.Finish()
+               monkey.UnpatchAll()
+       })
+
+       Context("reconcile storage node", func() {
+               It("should success when aws rds cluster is not exits", func() {
+                       name := "test-aws-rds-cluster-not-exists"
+                       namespacedName := types.NamespacedName{
+                               Name:      name,
+                               Namespace: defaultTestNamespace,
+                       }
+                       storageNode := v1alpha1.StorageNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      name,
+                                       Namespace: defaultTestNamespace,
+                                       Annotations: map[string]string{
+                                               
v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+                                       },
+                               },
+                               Spec: v1alpha1.StorageNodeSpec{
+                                       StorageProviderName: providerName,
+                               },
+                       }
+                       Expect(fakeClient.Create(ctx, 
&storageNode)).Should(Succeed())
+
+                       descCluster := &dbmesh_rds.DescCluster{
+                               DBClusterIdentifier: "test-aws-rds-cluster",
+                               PrimaryEndpoint:     
"test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+                               ReaderEndpoint:      
"test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
+                               Port:                3306,
+                               Status:              
string(dbmesh_rds.DBClusterStatusAvailable),
+                       }
+                       descInstance := &dbmesh_rds.DescInstance{
+                               DBInstanceIdentifier: 
"test-aws-rds-cluster-instance-1",
+                               DBClusterIdentifier:  "test-aws-rds-cluster",
+                               Endpoint: dbmesh_rds.Endpoint{
+                                       Address: 
"test-aws-rds-cluster-instance-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+                                       Port:    3306,
+                               },
+                               DBInstanceStatus: 
dbmesh_rds.DBInstanceStatusAvailable,
+                       }
+
+                       // mock aws rds cluster is not exist
+                       mockAws.EXPECT().GetRDSCluster(gomock.Any(), 
gomock.Any()).Return(nil, nil).Times(1)
+                       // mock create aws aurora cluster
+                       mockAws.EXPECT().CreateRDSCluster(gomock.Any(), 
gomock.Any(), gomock.Any()).Return(nil)
+                       // mock aws aurora cluster is created
+                       mockAws.EXPECT().GetRDSCluster(gomock.Any(), 
gomock.Any()).Return(descCluster, nil).Times(1)
+                       // mock aws instance is created
+                       mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), 
gomock.Any()).Return([]*dbmesh_rds.DescInstance{descInstance}, nil).Times(1)
+
+                       req := ctrl.Request{NamespacedName: namespacedName}
+                       _, err := reconciler.Reconcile(ctx, req)
+                       Expect(err).To(BeNil())
+                       sn := &v1alpha1.StorageNode{}
+                       Expect(fakeClient.Get(ctx, namespacedName, 
sn)).Should(Succeed())
+                       
Expect(sn.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseReady))
+               })
+
+               It("should success when storage node been delete", func() {
+                       name := "test-aws-rds-cluster-deleted"
+                       namespacedName := types.NamespacedName{
+                               Name:      name,
+                               Namespace: defaultTestNamespace,
+                       }
+                       req := ctrl.Request{NamespacedName: namespacedName}
+                       storageNode := &v1alpha1.StorageNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      name,
+                                       Namespace: defaultTestNamespace,
+                                       Annotations: map[string]string{
+                                               
v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+                                       },
+                                       Finalizers: []string{FinalizerName},
+                               },
+                               Spec: v1alpha1.StorageNodeSpec{
+                                       StorageProviderName: providerName,
+                               },
+                               Status: v1alpha1.StorageNodeStatus{
+                                       Phase: v1alpha1.StorageNodePhaseReady,
+                                       Cluster: v1alpha1.ClusterStatus{
+                                               Status:          
string(dbmesh_rds.DBClusterStatusAvailable),
+                                               PrimaryEndpoint: 
v1alpha1.Endpoint{Address: 
"test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+                                               ReaderEndpoints: 
[]v1alpha1.Endpoint{{Address: 
"test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 
3306}},
+                                       },
+                                       Instances: []v1alpha1.InstanceStatus{
+                                               {
+                                                       Status:   
string(dbmesh_rds.DBInstanceStatusAvailable),
+                                                       Endpoint: 
v1alpha1.Endpoint{Address: 
"test-aws-rds-cluster-instance-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", 
Port: 3306},
+                                               },
+                                       },
+                               },
+                       }
+
+                       Expect(fakeClient.Create(ctx, 
storageNode)).Should(Succeed())
+
+                       descCluster := &dbmesh_rds.DescCluster{
+                               DBClusterIdentifier: "test-aws-rds-cluster",
+                               PrimaryEndpoint:     
"test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+                               ReaderEndpoint:      
"test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
+                               Port:                3306,
+                               Status:              
string(dbmesh_rds.DBClusterStatusAvailable),
+                       }
+
+                       descInstance := &dbmesh_rds.DescInstance{
+                               DBInstanceIdentifier: "test-aws-rds-cluster-1",
+                               DBClusterIdentifier:  "test-aws-rds-cluster",
+                               Endpoint: dbmesh_rds.Endpoint{
+                                       Address: 
"test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+                                       Port:    3306,
+                               },
+                               DBInstanceStatus: 
dbmesh_rds.DBInstanceStatusAvailable,
+                       }
+
+                       Expect(fakeClient.Delete(ctx, 
storageNode)).Should(Succeed())
+
+                       // mock aws rds cluster is exists
+                       mockAws.EXPECT().GetRDSCluster(gomock.Any(), 
gomock.Any()).Return(descCluster, nil).Times(1)
+                       // mock get instances of aws rds cluster
+                       mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), 
gomock.Any()).Return([]*dbmesh_rds.DescInstance{descInstance}, nil).Times(1)
+                       // mock delete aws rds cluster
+                       mockAws.EXPECT().DeleteRDSCluster(gomock.Any(), 
gomock.Any(), gomock.Any()).Return(nil)
+
+                       _, err := reconciler.Reconcile(ctx, req)
+                       Expect(err).To(BeNil())
+
+                       Expect(fakeClient.Get(ctx, namespacedName, 
storageNode)).Should(Succeed())
+                       Expect(storageNode.DeletionTimestamp).NotTo(BeNil())
+                       
Expect(storageNode.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseDeleting))
+               })
+
+               It("should be success when storage node is deleting", func() {
+                       name := "test-aws-rds-cluster-deleting"
+                       namespacedName := types.NamespacedName{
+                               Name:      name,
+                               Namespace: defaultTestNamespace,
+                       }
+                       req := ctrl.Request{NamespacedName: namespacedName}
+                       deletionTimestamp := metav1.Now()
+                       storageNode := &v1alpha1.StorageNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      name,
+                                       Namespace: defaultTestNamespace,
+                                       Annotations: map[string]string{
+                                               
v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+                                       },
+                                       Finalizers:        
[]string{FinalizerName},
+                                       DeletionTimestamp: &deletionTimestamp,
+                               },
+                               Spec: v1alpha1.StorageNodeSpec{
+                                       StorageProviderName: providerName,
+                               },
+                               Status: v1alpha1.StorageNodeStatus{
+                                       Phase: 
v1alpha1.StorageNodePhaseDeleting,
+                                       Cluster: v1alpha1.ClusterStatus{
+                                               Status:          
string(dbmesh_rds.DBClusterStatusDeleting),
+                                               PrimaryEndpoint: 
v1alpha1.Endpoint{Address: 
"test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+                                               ReaderEndpoints: 
[]v1alpha1.Endpoint{{Address: 
"test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 
3306}},
+                                       },
+                                       Instances: []v1alpha1.InstanceStatus{
+                                               {
+                                                       Status:   
string(dbmesh_rds.DBInstanceStatusDeleting),
+                                                       Endpoint: 
v1alpha1.Endpoint{Address: 
"test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 
3306},
+                                               },
+                                       },
+                               },
+                       }
+                       Expect(fakeClient.Create(ctx, 
storageNode)).Should(Succeed())
+
+                       // mock aws rds cluster is not exists
+                       mockAws.EXPECT().GetRDSCluster(gomock.Any(), 
gomock.Any()).Return(nil, nil).Times(1)
+                       // mock get instances of aws rds cluster is not exists
+                       mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), 
gomock.Any()).Return(nil, nil).Times(1)
+
+                       _, err := reconciler.Reconcile(ctx, req)
+                       Expect(err).To(BeNil())
+                       Expect(fakeClient.Get(ctx, namespacedName, 
storageNode)).Should(Succeed())
+                       
Expect(storageNode.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseDeleteComplete))
+               })
 
+               It("should be success when storage node is delete completed", 
func() {
+                       name := "test-aws-rds-cluster-delete-completed"
+                       namespacedName := types.NamespacedName{
+                               Name:      name,
+                               Namespace: defaultTestNamespace,
+                       }
+                       req := ctrl.Request{NamespacedName: namespacedName}
+                       deletionTimestamp := metav1.Now()
+                       storageNode := &v1alpha1.StorageNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      name,
+                                       Namespace: defaultTestNamespace,
+                                       Annotations: map[string]string{
+                                               
v1alpha1.AnnotationsClusterIdentifier: "test-aws-rds-cluster",
+                                       },
+                                       Finalizers:        
[]string{FinalizerName},
+                                       DeletionTimestamp: &deletionTimestamp,
+                               },
+                               Spec: v1alpha1.StorageNodeSpec{
+                                       StorageProviderName: providerName,
+                               },
+                               Status: v1alpha1.StorageNodeStatus{
+                                       Phase: 
v1alpha1.StorageNodePhaseDeleteComplete,
+                               },
+                       }
+                       Expect(fakeClient.Create(ctx, 
storageNode)).Should(Succeed())
+
+                       _, err := reconciler.Reconcile(ctx, req)
+                       Expect(err).To(BeNil())
+                       err = fakeClient.Get(ctx, namespacedName, storageNode)
+                       Expect(apierrors.IsNotFound(err)).To(BeTrue())
+               })
+
+               It("should be success when storage node is ready for register", 
func() {
+                       name := "test-aws-rds-cluster-ready-for-register"
+                       namespacedName := types.NamespacedName{
+                               Name:      name,
+                               Namespace: defaultTestNamespace,
+                       }
+                       req := ctrl.Request{NamespacedName: namespacedName}
+                       storageNode := &v1alpha1.StorageNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      name,
+                                       Namespace: defaultTestNamespace,
+                                       Annotations: map[string]string{
+                                               
v1alpha1.AnnotationsClusterIdentifier:   "test-aws-rds-cluster",
+                                               
AnnotationKeyRegisterStorageUnitEnabled: "true",
+                                               AnnotationKeyLogicDatabaseName: 
         "test-logic-db",
+                                               
v1alpha1.AnnotationsInstanceDBName:      "test-instance-db",
+                                               AnnotationKeyComputeNodeName:   
         "test-compute-node",
+                                       },
+                               },
+                               Spec: v1alpha1.StorageNodeSpec{
+                                       StorageProviderName: providerName,
+                               },
+                               Status: v1alpha1.StorageNodeStatus{
+                                       Phase: v1alpha1.StorageNodePhaseReady,
+                                       Cluster: v1alpha1.ClusterStatus{
+                                               Status:          
string(dbmesh_rds.DBClusterStatusAvailable),
+                                               PrimaryEndpoint: 
v1alpha1.Endpoint{Address: 
"test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+                                               ReaderEndpoints: 
[]v1alpha1.Endpoint{{Address: 
"test-aws-rds-cluster.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 
3306}},
+                                       },
+                                       Instances: []v1alpha1.InstanceStatus{
+                                               {
+                                                       Status:   
string(dbmesh_rds.DBInstanceStatusAvailable),
+                                                       Endpoint: 
v1alpha1.Endpoint{Address: 
"test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 
3306},
+                                               },
+                                       },
+                               },
+                       }
+                       cn := &v1alpha1.ComputeNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      "test-compute-node",
+                                       Namespace: defaultTestNamespace,
+                               },
+                               Spec: v1alpha1.ComputeNodeSpec{
+                                       Bootstrap: v1alpha1.BootstrapConfig{
+                                               ServerConfig: 
v1alpha1.ServerConfig{
+                                                       Authority: 
v1alpha1.ComputeNodeAuthority{
+                                                               Users: 
[]v1alpha1.ComputeNodeUser{{User: "test-user", Password: "test-password"}},
+                                                       },
+                                                       Mode:  
v1alpha1.ComputeNodeServerMode{},
+                                                       Props: nil,
+                                               },
+                                       },
+                               },
+                       }
+                       svc := &corev1.Service{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      "test-compute-node",
+                                       Namespace: defaultTestNamespace,
+                               },
+                               Spec: corev1.ServiceSpec{
+                                       Ports: []corev1.ServicePort{
+                                               {Name: "http", Protocol: "TCP", 
Port: 3307},
+                                       },
+                               },
+                       }
+                       Expect(fakeClient.Create(ctx, 
storageNode)).Should(Succeed())
+                       Expect(fakeClient.Create(ctx, cn)).Should(Succeed())
+                       Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
+
+                       // mock aws rds cluster is available
+                       mockAws.EXPECT().GetRDSCluster(gomock.Any(), 
gomock.Any()).Return(&dbmesh_rds.DescCluster{
+                               DBClusterIdentifier: "test-aws-rds-cluster",
+                               PrimaryEndpoint:     
"test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+                               Port:                int32(3306),
+                               Status:              
string(dbmesh_rds.DBClusterStatusAvailable),
+                       }, nil).Times(1)
+                       // mock get instances of aws aurora are available
+                       mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), 
gomock.Any()).Return([]*dbmesh_rds.DescInstance{
+                               {
+                                       DBInstanceIdentifier: 
"test-aws-rds-cluster-instance-0",
+                                       DBInstanceStatus:     
dbmesh_rds.DBInstanceStatusAvailable,
+                                       Endpoint:             
dbmesh_rds.Endpoint{Address: 
"test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 
3306},
+                               },
+                       }, nil).Times(1)
+
+                       host, port, username, password := 
getDatasourceInfoFromCluster(storageNode, provider)
+
+                       // mock shardingsphere
+                       
mockSS.EXPECT().CreateDatabase(gomock.Any()).Return(nil).Times(1)
+                       mockSS.EXPECT().RegisterStorageUnit("test-logic-db", 
getDSName(storageNode), host, uint(port), "test-instance-db", username, 
password).Return(nil).Times(1)
+                       mockSS.EXPECT().Close().Return(nil).Times(1)
+
+                       _, err := reconciler.Reconcile(ctx, req)
+                       Expect(err).To(BeNil())
+
+                       err = fakeClient.Get(ctx, namespacedName, storageNode)
+                       Expect(storageNode.Status.Registered).To(BeTrue())
+               })
+
+               It("should be success unregistered when storage node is 
deleting", func() {
+                       snName := "test-aws-rds-cluster-unregistered"
+                       namespacedName := types.NamespacedName{
+                               Name:      snName,
+                               Namespace: defaultTestNamespace,
+                       }
+                       req := ctrl.Request{NamespacedName: namespacedName}
+
+                       storageNode := &v1alpha1.StorageNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      snName,
+                                       Namespace: defaultTestNamespace,
+                                       Annotations: map[string]string{
+                                               
v1alpha1.AnnotationsClusterIdentifier:   "test-aws-rds-cluster",
+                                               
AnnotationKeyRegisterStorageUnitEnabled: "true",
+                                               AnnotationKeyLogicDatabaseName: 
         "test-logic-db",
+                                               
v1alpha1.AnnotationsInstanceDBName:      "test-instance-db",
+                                               AnnotationKeyComputeNodeName:   
         "test-compute-node",
+                                       },
+                               },
+                               Spec: v1alpha1.StorageNodeSpec{
+                                       StorageProviderName: providerName,
+                                       Replicas:            2,
+                               },
+                               Status: v1alpha1.StorageNodeStatus{
+                                       Phase:      
v1alpha1.StorageNodePhaseReady,
+                                       Registered: true,
+                               },
+                       }
+                       cn := &v1alpha1.ComputeNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      "test-compute-node",
+                                       Namespace: defaultTestNamespace,
+                               },
+                               Spec: v1alpha1.ComputeNodeSpec{
+                                       Bootstrap: v1alpha1.BootstrapConfig{
+                                               ServerConfig: 
v1alpha1.ServerConfig{
+                                                       Authority: 
v1alpha1.ComputeNodeAuthority{
+                                                               Users: 
[]v1alpha1.ComputeNodeUser{{User: "test-user", Password: "test-password"}},
+                                                       },
+                                               },
+                                       },
+                               },
+                       }
+                       svc := &corev1.Service{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      "test-compute-node",
+                                       Namespace: defaultTestNamespace,
+                               },
+                               Spec: corev1.ServiceSpec{
+                                       Ports: []corev1.ServicePort{
+                                               {Name: "http", Protocol: "TCP", 
Port: 3307},
+                                       },
+                               },
+                       }
+                       Expect(fakeClient.Create(ctx, 
storageNode)).Should(Succeed())
+                       Expect(fakeClient.Create(ctx, cn)).Should(Succeed())
+                       Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
+
+                       // mock aws rds cluster is available
+                       mockAws.EXPECT().GetRDSCluster(gomock.Any(), 
gomock.Any()).Return(&dbmesh_rds.DescCluster{
+                               DBClusterIdentifier: "test-aws-rds-cluster",
+                               PrimaryEndpoint:     
"test-aws-rds-cluster.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+                               Port:                int32(3306),
+                               Status:              
string(dbmesh_rds.DBClusterStatusAvailable),
+                               DBClusterMembers: []dbmesh_rds.ClusterMember{
+                                       {DBInstanceIdentifier: 
"test-aws-rds-cluster-instance-0"},
+                                       {DBInstanceIdentifier: 
"test-aws-rds-cluster-instance-1"},
+                               },
+                       }, nil).Times(2)
+                       // mock get instances of aws aurora are available
+                       mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), 
gomock.Any()).Return([]*dbmesh_rds.DescInstance{
+                               {
+                                       DBInstanceIdentifier: 
"test-aws-rds-cluster-instance-0",
+                                       DBInstanceStatus:     
dbmesh_rds.DBInstanceStatusAvailable,
+                                       Endpoint:             
dbmesh_rds.Endpoint{Address: 
"test-aws-rds-cluster-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 
3306},
+                               },
+                               {
+                                       DBInstanceIdentifier: 
"test-aws-rds-cluster-instance-1",
+                                       DBInstanceStatus:     
dbmesh_rds.DBInstanceStatusAvailable,
+                                       Endpoint:             
dbmesh_rds.Endpoint{Address: 
"test-aws-rds-cluster-2.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 
3306},
+                               },
+                       }, nil).Times(2)
+
+                       _, err := reconciler.Reconcile(ctx, req)
+                       Expect(err).To(BeNil())
+
+                       Expect(fakeClient.Delete(ctx, 
storageNode)).Should(Succeed())
+                       // mock shardingsphere
+                       mockSS.EXPECT().UnRegisterStorageUnit("test-logic-db", 
getDSName(storageNode)).Return(nil).Times(1)
+                       mockSS.EXPECT().Close().Return(nil).Times(1)
+
+                       // mock delete aws rds cluster
+                       mockAws.EXPECT().DeleteRDSCluster(gomock.Any(), 
gomock.Any(), gomock.Any()).Return(nil).Times(1)
+
+                       _, err = reconciler.Reconcile(ctx, req)
+                       Expect(err).To(BeNil())
+
+                       err = fakeClient.Get(ctx, namespacedName, storageNode)
+                       Expect(storageNode.Status.Registered).To(BeFalse())
+               })
+       })
 })
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go 
b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index 178042c..39aa7d1 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -164,6 +164,11 @@ func (r *StorageNodeReconciler) reconcile(ctx 
context.Context, storageProvider *
                        r.Recorder.Eventf(node, corev1.EventTypeWarning, 
"Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, 
err:%s", node.GetNamespace(), node.GetName(), err.Error()))
                        return ctrl.Result{RequeueAfter: defaultRequeueTime}, 
err
                }
+       case v1alpha1.ProvisionerAWSRDSCluster:
+               if err := r.reconcileAwsRDSCluster(ctx, 
aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
+                       r.Recorder.Eventf(node, corev1.EventTypeWarning, 
"Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Cluster %s/%s, 
err:%s", node.GetNamespace(), node.GetName(), err.Error()))
+                       return ctrl.Result{RequeueAfter: defaultRequeueTime}, 
err
+               }
        case v1alpha1.ProvisionerAWSAurora:
                if err := r.reconcileAwsAurora(ctx, aws.NewRdsClient(r.AwsRDS), 
node, storageProvider); err != nil {
                        r.Recorder.Eventf(node, corev1.EventTypeWarning, 
"Reconcile Failed", fmt.Sprintf("unable to reconcile AWS Aurora %s/%s, err:%s", 
node.GetNamespace(), node.GetName(), err.Error()))
@@ -215,7 +220,9 @@ func (r *StorageNodeReconciler) getStorageProvider(ctx 
context.Context, node *v1
 
        // check provisioner
        // aws-like provisioner need aws rds client
-       if storageProvider.Spec.Provisioner == 
v1alpha1.ProvisionerAWSRDSInstance || storageProvider.Spec.Provisioner == 
v1alpha1.ProvisionerAWSAurora {
+       if storageProvider.Spec.Provisioner == 
v1alpha1.ProvisionerAWSRDSInstance ||
+               storageProvider.Spec.Provisioner == 
v1alpha1.ProvisionerAWSAurora ||
+               storageProvider.Spec.Provisioner == 
v1alpha1.ProvisionerAWSRDSCluster {
                if r.AwsRDS == nil {
                        r.Recorder.Event(node, corev1.EventTypeWarning, 
"AwsRdsClientIsNil", "aws rds client is nil, please check your aws credentials")
                        return nil, fmt.Errorf("aws rds client is nil, please 
check your aws credentials")
@@ -238,7 +245,7 @@ func computeDesiredState(status v1alpha1.StorageNodeStatus) 
v1alpha1.StorageNode
                }
        } else {
                // If the storage node is not being deleted, check if all 
instances are ready.
-               if (clusterStatus == "" || clusterStatus == 
rds.DBClusterStatusAvailable) && allInstancesReady(status.Instances) {
+               if (clusterStatus == "" || clusterStatus == 
string(rds.DBClusterStatusAvailable)) && allInstancesReady(status.Instances) {
                        desiredState.Phase = v1alpha1.StorageNodePhaseReady
                } else {
                        desiredState.Phase = v1alpha1.StorageNodePhaseNotReady
@@ -256,7 +263,7 @@ func computeNewConditions(desiredState, status 
v1alpha1.StorageNodeStatus, clust
 
        // Update the cluster ready condition if the cluster status is not empty
        if clusterStatus != "" {
-               if clusterStatus == rds.DBClusterStatusAvailable {
+               if clusterStatus == string(rds.DBClusterStatusAvailable) {
                        
newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{
                                Type:           
v1alpha1.StorageNodeConditionTypeClusterReady,
                                Status:         corev1.ConditionTrue,
@@ -372,6 +379,34 @@ func updateAWSRDSInstanceStatus(node 
*v1alpha1.StorageNode, instance *rds.DescIn
        return nil
 }
 
+func (r *StorageNodeReconciler) reconcileAwsRDSCluster(ctx context.Context, 
client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider 
*v1alpha1.StorageProvider) error {
+       ac, err := client.GetRDSCluster(ctx, node)
+       if err != nil {
+               return err
+       }
+
+       if ac == nil {
+               // create instance
+               err = client.CreateRDSCluster(ctx, node, 
storageProvider.Spec.Parameters)
+               if err != nil {
+                       return err
+               }
+               ac, err = client.GetRDSCluster(ctx, node)
+               if err != nil {
+                       return err
+               }
+       }
+
+       // TODO: reconcile instance of aurora
+
+       // update storage node status
+       if err := updateClusterStatus(ctx, client, node, ac); err != nil {
+               return fmt.Errorf("updateClusterStatus failed: %w", err)
+       }
+
+       return nil
+}
+
 func (r *StorageNodeReconciler) reconcileAwsAurora(ctx context.Context, client 
aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider 
*v1alpha1.StorageProvider) error {
        r.Log.Info("reconcileAwsAurora", "node", node.GetName(), "phase", 
node.Status.Phase)
        ac, err := client.GetAuroraCluster(ctx, node)
@@ -451,6 +486,10 @@ func (r *StorageNodeReconciler) deleteDatabaseCluster(ctx 
context.Context, node
                if err := r.deleteAWSRDSInstance(ctx, 
aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
                        return fmt.Errorf("delete aws rds instance failed: %w", 
err)
                }
+       case v1alpha1.ProvisionerAWSRDSCluster:
+               if err := r.deleteAWSRDSCluster(ctx, 
aws.NewRdsClient(r.AwsRDS), node, storageProvider); err != nil {
+                       return fmt.Errorf("delete aws rds cluster failed: %w", 
err)
+               }
        case v1alpha1.ProvisionerAWSAurora:
                if err := r.deleteAWSAurora(ctx, aws.NewRdsClient(r.AwsRDS), 
node, storageProvider); err != nil {
                        return fmt.Errorf("delete aws aurora cluster failed: 
%w", err)
@@ -487,6 +526,32 @@ func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx 
context.Context, client
        return nil
 }
 
+// nolint:dupl
+func (r *StorageNodeReconciler) deleteAWSRDSCluster(ctx context.Context, 
client aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider 
*v1alpha1.StorageProvider) error {
+       if node.Annotations[v1alpha1.AnnotationsClusterIdentifier] == "" {
+               return nil
+       }
+
+       cluster, err := client.GetRDSCluster(ctx, node)
+       if err != nil {
+               return fmt.Errorf("get rds cluster failed: %w", err)
+       }
+       if cluster != nil && cluster.Status != 
string(rds.DBClusterStatusDeleting) {
+               if err := client.DeleteRDSCluster(ctx, node, storageProvider); 
err != nil {
+                       r.Recorder.Eventf(node, corev1.EventTypeWarning, 
"DeleteFailed", "Failed to delete rds cluster %s: %s", 
node.Annotations[v1alpha1.AnnotationsClusterIdentifier], err.Error())
+                       return err
+               }
+               r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", 
fmt.Sprintf("rds cluster %s is deleting", 
node.Annotations[v1alpha1.AnnotationsClusterIdentifier]))
+       }
+
+       // update storage node status
+       if err := updateClusterStatus(ctx, client, node, cluster); err != nil {
+               return fmt.Errorf("updateClusterStatus failed: %w", err)
+       }
+       return nil
+}
+
+// nolint:dupl
 func (r *StorageNodeReconciler) deleteAWSAurora(ctx context.Context, client 
aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider 
*v1alpha1.StorageProvider) error {
        if node.Annotations[v1alpha1.AnnotationsClusterIdentifier] == "" {
                return nil
@@ -496,7 +561,7 @@ func (r *StorageNodeReconciler) deleteAWSAurora(ctx 
context.Context, client aws.
        if err != nil {
                return fmt.Errorf("get aurora cluster failed: %w", err)
        }
-       if auroraCluster != nil && auroraCluster.Status != 
rds.DBClusterStatusDeleting {
+       if auroraCluster != nil && auroraCluster.Status != 
string(rds.DBClusterStatusDeleting) {
                if err := client.DeleteAuroraCluster(ctx, node, 
storageProvider); err != nil {
                        r.Recorder.Eventf(node, corev1.EventTypeWarning, 
"DeleteFailed", "Failed to delete aurora cluster %s: %s", 
node.Annotations[v1alpha1.AnnotationsClusterIdentifier], err.Error())
                        return err
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go 
b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
index 90df91e..42e3cbe 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
@@ -120,8 +120,8 @@ func (c *RdsClient) DeleteAuroraCluster(ctx 
context.Context, node *v1alpha1.Stor
        case v1alpha1.StorageReclaimPolicyRetain:
                
aurora.SetDeleteAutomateBackups(false).SetSkipFinalSnapshot(true)
        case v1alpha1.StorageReclaimPolicyDeleteWithFinalSnapshot:
+               // TODO set final snapshot name
                
aurora.SetDeleteAutomateBackups(true).SetSkipFinalSnapshot(false)
        }
-
        return aurora.Delete(ctx)
 }
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go 
b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go
index cf90112..0fb62d4 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go
@@ -39,6 +39,10 @@ type IRdsClient interface {
        GetInstancesByFilters(ctx context.Context, filters map[string][]string) 
(instances []*rds.DescInstance, err error)
        DeleteInstance(ctx context.Context, node *v1alpha1.StorageNode, 
storageProvider *v1alpha1.StorageProvider) error
 
+       CreateRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, 
params map[string]string) error
+       GetRDSCluster(ctx context.Context, node *v1alpha1.StorageNode) (cluster 
*rds.DescCluster, err error)
+       DeleteRDSCluster(ctx context.Context, node *v1alpha1.StorageNode, 
storageProvider *v1alpha1.StorageProvider) error
+
        CreateAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode, 
params map[string]string) error
        GetAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode) 
(cluster *rds.DescCluster, err error)
        DeleteAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode, 
storageProvider *v1alpha1.StorageProvider) error
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go 
b/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go
index 060125d..30fc045 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go
@@ -92,6 +92,20 @@ func (mr *MockIRdsClientMockRecorder) CreateInstance(ctx, 
node, params interface
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateInstance", 
reflect.TypeOf((*MockIRdsClient)(nil).CreateInstance), ctx, node, params)
 }
 
+// CreateRDSCluster mocks base method.
+func (m *MockIRdsClient) CreateRDSCluster(ctx context.Context, node 
*v1alpha1.StorageNode, params map[string]string) error {
+       m.ctrl.T.Helper()
+       ret := m.ctrl.Call(m, "CreateRDSCluster", ctx, node, params)
+       ret0, _ := ret[0].(error)
+       return ret0
+}
+
+// CreateRDSCluster indicates an expected call of CreateRDSCluster.
+func (mr *MockIRdsClientMockRecorder) CreateRDSCluster(ctx, node, params 
interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"CreateRDSCluster", reflect.TypeOf((*MockIRdsClient)(nil).CreateRDSCluster), 
ctx, node, params)
+}
+
 // DeleteAuroraCluster mocks base method.
 func (m *MockIRdsClient) DeleteAuroraCluster(ctx context.Context, node 
*v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
        m.ctrl.T.Helper()
@@ -120,6 +134,20 @@ func (mr *MockIRdsClientMockRecorder) DeleteInstance(ctx, 
node, storageProvider
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteInstance", 
reflect.TypeOf((*MockIRdsClient)(nil).DeleteInstance), ctx, node, 
storageProvider)
 }
 
+// DeleteRDSCluster mocks base method.
+func (m *MockIRdsClient) DeleteRDSCluster(ctx context.Context, node 
*v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+       m.ctrl.T.Helper()
+       ret := m.ctrl.Call(m, "DeleteRDSCluster", ctx, node, storageProvider)
+       ret0, _ := ret[0].(error)
+       return ret0
+}
+
+// DeleteRDSCluster indicates an expected call of DeleteRDSCluster.
+func (mr *MockIRdsClientMockRecorder) DeleteRDSCluster(ctx, node, 
storageProvider interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"DeleteRDSCluster", reflect.TypeOf((*MockIRdsClient)(nil).DeleteRDSCluster), 
ctx, node, storageProvider)
+}
+
 // GetAuroraCluster mocks base method.
 func (m *MockIRdsClient) GetAuroraCluster(ctx context.Context, node 
*v1alpha1.StorageNode) (*rds.DescCluster, error) {
        m.ctrl.T.Helper()
@@ -180,6 +208,21 @@ func (mr *MockIRdsClientMockRecorder) 
GetInstancesByFilters(ctx, filters interfa
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"GetInstancesByFilters", 
reflect.TypeOf((*MockIRdsClient)(nil).GetInstancesByFilters), ctx, filters)
 }
 
+// GetRDSCluster mocks base method.
+func (m *MockIRdsClient) GetRDSCluster(ctx context.Context, node 
*v1alpha1.StorageNode) (*rds.DescCluster, error) {
+       m.ctrl.T.Helper()
+       ret := m.ctrl.Call(m, "GetRDSCluster", ctx, node)
+       ret0, _ := ret[0].(*rds.DescCluster)
+       ret1, _ := ret[1].(error)
+       return ret0, ret1
+}
+
+// GetRDSCluster indicates an expected call of GetRDSCluster.
+func (mr *MockIRdsClientMockRecorder) GetRDSCluster(ctx, node interface{}) 
*gomock.Call {
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRDSCluster", 
reflect.TypeOf((*MockIRdsClient)(nil).GetRDSCluster), ctx, node)
+}
+
 // Instance mocks base method.
 func (m *MockIRdsClient) Instance() rds.Instance {
        m.ctrl.T.Helper()
diff --git 
a/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdscluster.go 
b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdscluster.go
new file mode 100644
index 0000000..ae278ff
--- /dev/null
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdscluster.go
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package aws
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "strconv"
+       "strings"
+
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+       "github.com/database-mesh/golang-sdk/aws/client/rds"
+)
+
+func validateCreateRDSClusterParams(node *v1alpha1.StorageNode, paramsPtr 
*map[string]string) error {
+       requiredParams := map[string]string{
+               "instanceClass":      "instance class is empty",
+               "engine":             "engine is empty",
+               "engineVersion":      "engine version is empty",
+               "clusterIdentifier":  "cluster identifier is empty",
+               "masterUsername":     "master username is empty",
+               "masterUserPassword": "master user password is empty",
+               "allocatedStorage":   "allocated storage is empty",
+               "iops":               "iops is empty",
+               "storageType":        "storage type is empty",
+       }
+
+       params := *paramsPtr
+       if v, ok := node.Annotations[v1alpha1.AnnotationsClusterIdentifier]; 
!ok || v == "" {
+               return errors.New("cluster identifier is empty")
+       } else {
+               params["clusterIdentifier"] = v
+       }
+
+       if len(params["clusterIdentifier"]) > 50 {
+               return errors.New("cluster identifier is too long, max length 
is 50")
+       }
+
+       for k, v := range requiredParams {
+               if val, ok := params[k]; !ok || val == "" {
+                       return fmt.Errorf(v)
+               }
+       }
+
+       // valid mysql engine version
+       if params["engine"] == "mysql" {
+               version := strings.Split(params["engineVersion"], ".")[0]
+               if version != "8" {
+                       return fmt.Errorf("mysql engine version is not 
supported, only support 8.x")
+               }
+       }
+
+       if params["storageType"] != "io1" {
+               return fmt.Errorf("storage type is not supported, only support 
io1")
+       }
+
+       return nil
+}
+
+func getAllocatedStorage(allocatedStorageStr string) (allocatedStorage int, 
err error) {
+       allocatedStorage, err = strconv.Atoi(allocatedStorageStr)
+       if err != nil {
+               return 0, fmt.Errorf("allocated storage is not a number, %v", 
err)
+       }
+       if allocatedStorage < 100 || allocatedStorage > 65536 {
+               return 0, fmt.Errorf("allocated storage is out of range, min is 
100, max is 65536")
+       }
+       return allocatedStorage, nil
+}
+
+func getIOPS(iopsStr string) (iops int, err error) {
+       iops, err = strconv.Atoi(iopsStr)
+       if err != nil {
+               return 0, fmt.Errorf("iops is not a number, %v", err)
+       }
+       if iops < 1000 || iops > 256000 {
+               return 0, fmt.Errorf("iops is out of range, min is 1000, max is 
256000")
+       }
+       return iops, nil
+}
+
+// CreateRDSCluster creates rds cluster
+// ref: 
https://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/APIReference/API_CreateDBInstance.html
+func (c *RdsClient) CreateRDSCluster(ctx context.Context, node 
*v1alpha1.StorageNode, params map[string]string) error {
+       if err := validateCreateRDSClusterParams(node, &params); err != nil {
+               return err
+       }
+
+       cc := c.Cluster()
+
+       allocatedStorage, err := getAllocatedStorage(params["allocatedStorage"])
+       if err != nil {
+               return err
+       }
+
+       iops, err := getIOPS(params["iops"])
+       if err != nil {
+               return err
+       }
+
+       if iops > allocatedStorage*50 || iops*2 < allocatedStorage {
+               return fmt.Errorf("the IOPS to GiB ratio must be between 0.5 
and 50, current iops is %d, allocated storage is %d", iops, allocatedStorage)
+       }
+
+       cc.SetEngine(params["engine"]).
+               SetEngineVersion(params["engineVersion"]).
+               SetDBClusterIdentifier(params["clusterIdentifier"]).
+               SetMasterUsername(params["masterUsername"]).
+               SetMasterUserPassword(params["masterUserPassword"]).
+               SetAllocatedStorage(int32(allocatedStorage)).
+               SetDBClusterInstanceClass(params["instanceClass"]).
+               SetIOPS(int32(iops)).
+               SetStorageType(params["storageType"])
+
+       if v, ok := node.Annotations[v1alpha1.AnnotationsInstanceDBName]; ok && 
v != "" {
+               cc.SetDatabaseName(v)
+       }
+
+       if v, ok := params["publicAccessible"]; ok && v == "false" {
+               cc.SetPublicAccessible(false)
+       } else {
+               cc.SetPublicAccessible(true)
+       }
+
+       if params["vpcSecurityGroupIds"] != "" {
+               
cc.SetVpcSecurityGroupIds(strings.Split(params["vpcSecurityGroupIds"], ","))
+       }
+
+       if err := cc.Create(ctx); err != nil {
+               return fmt.Errorf("create rds cluster failed, %v", err)
+       }
+
+       return nil
+}
+
+func (c *RdsClient) GetRDSCluster(ctx context.Context, node 
*v1alpha1.StorageNode) (cluster *rds.DescCluster, err error) {
+       identifier, ok := 
node.Annotations[v1alpha1.AnnotationsClusterIdentifier]
+       if !ok {
+               return nil, errors.New("cluster identifier is empty")
+       }
+
+       cc := c.Cluster()
+       cc.SetDBClusterIdentifier(identifier)
+       return cc.Describe(ctx)
+}
+
+func (c *RdsClient) DeleteRDSCluster(ctx context.Context, node 
*v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+       identifier, ok := 
node.Annotations[v1alpha1.AnnotationsClusterIdentifier]
+       if !ok {
+               return fmt.Errorf("cluster identifier is empty")
+       }
+
+       cc := c.Cluster()
+       cc.SetDBClusterIdentifier(identifier)
+
+       cluster, err := cc.Describe(ctx)
+       if err != nil {
+               return fmt.Errorf("describe rds cluster failed, %v", err)
+       }
+       if cluster == nil || cluster.Status == 
string(rds.DBClusterStatusDeleting) {
+               return nil
+       }
+
+       switch storageProvider.Spec.ReclaimPolicy {
+       case v1alpha1.StorageReclaimPolicyDelete:
+               cc.SetSkipFinalSnapshot(true)
+       case v1alpha1.StorageReclaimPolicyDeleteWithFinalSnapshot:
+               if v, ok := 
node.Annotations[v1alpha1.AnnotationsFinalSnapshotIdentifier]; !ok || v == "" {
+                       return fmt.Errorf("final snapshot identifier is empty")
+               }
+               if cluster.Status != string(rds.DBClusterStatusAvailable) {
+                       return fmt.Errorf("rds cluster is not available, can 
not delete with final snapshot")
+               }
+               
cc.SetFinalDBSnapshotIdentifier(node.Annotations[v1alpha1.AnnotationsFinalSnapshotIdentifier])
+               cc.SetSkipFinalSnapshot(false)
+       case v1alpha1.StorageReclaimPolicyRetain:
+               return fmt.Errorf("rds cluster does not support retain policy")
+       }
+
+       return cc.Delete(ctx)
+}
diff --git a/shardingsphere-operator/test/e2e/storage_node_controller_test.go 
b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
index f6744b7..47508ff 100644
--- a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
+++ b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
@@ -324,7 +324,7 @@ var _ = Describe("StorageNode Controller Suite Test For AWS 
Aurora Cluster", fun
                        
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), 
"GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ 
*v1alpha1.StorageNode) (*dbmesh_rds.DescCluster, error) {
                                return &dbmesh_rds.DescCluster{
                                        DBClusterIdentifier: clusterIdentifier,
-                                       Status:              
dbmesh_rds.DBClusterStatusCreating,
+                                       Status:              
string(dbmesh_rds.DBClusterStatusCreating),
                                        PrimaryEndpoint:     
"test-primary-endpoint",
                                        ReaderEndpoint:      
"test-reader-endpoint",
                                        Port:                3306,
@@ -359,7 +359,7 @@ var _ = Describe("StorageNode Controller Suite Test For AWS 
Aurora Cluster", fun
                                newSN := &v1alpha1.StorageNode{}
                                Expect(k8sClient.Get(ctx, 
client.ObjectKey{Name: snName, Namespace: "default"}, newSN)).Should(Succeed())
                                return newSN.Status.Cluster.Status
-                       }, time.Second*10, 
time.Millisecond*250).Should(Equal(dbmesh_rds.DBClusterStatusCreating))
+                       }, time.Second*10, 
time.Millisecond*250).Should(Equal(string(dbmesh_rds.DBClusterStatusCreating)))
                })
 
                It("should success when cluster is available", func() {
@@ -367,7 +367,7 @@ var _ = Describe("StorageNode Controller Suite Test For AWS 
Aurora Cluster", fun
                        
monkey.PatchInstanceMethod(reflect.TypeOf(&aws.RdsClient{}), 
"GetAuroraCluster", func(_ *aws.RdsClient, _ context.Context, _ 
*v1alpha1.StorageNode) (*dbmesh_rds.DescCluster, error) {
                                return &dbmesh_rds.DescCluster{
                                        DBClusterIdentifier: clusterIdentifier,
-                                       Status:              
dbmesh_rds.DBClusterStatusAvailable,
+                                       Status:              
string(dbmesh_rds.DBClusterStatusAvailable),
                                        PrimaryEndpoint:     
"test-primary-endpoint",
                                        ReaderEndpoint:      
"test-reader-endpoint",
                                        Port:                3306,

Reply via email to