This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new cff9a61  feat(storage-node): support aws aurora automate create and 
delete
     new 6d54ea4  Merge pull request #389 from Xu-Wentao/sn-aws-aurora
cff9a61 is described below

commit cff9a610a727c0e8fad2facf958a8ccb2f2d6f2c
Author: xuwentao <[email protected]>
AuthorDate: Fri Jun 2 19:08:15 2023 +0800

    feat(storage-node): support aws aurora automate create and delete
---
 shardingsphere-operator/go.mod                     |   2 +-
 shardingsphere-operator/go.sum                     |   6 +-
 .../controllers/storage_ndoe_controller_test.go    | 244 ++++++++++++++++++++-
 .../pkg/controllers/storage_node_controller.go     | 140 +++++++-----
 .../kubernetes/cloudnative-pg/cloudnative-pg.go    |   2 +-
 .../pkg/reconcile/storagenode/aws/aurora.go        |  43 +++-
 .../pkg/reconcile/storagenode/aws/aws.go           |   1 +
 .../pkg/reconcile/storagenode/aws/mocks/aws.go     |  15 ++
 .../pkg/reconcile/storagenode/aws/rdsinstance.go   |   8 +
 9 files changed, 397 insertions(+), 64 deletions(-)

diff --git a/shardingsphere-operator/go.mod b/shardingsphere-operator/go.mod
index 8ff451a..60807e4 100644
--- a/shardingsphere-operator/go.mod
+++ b/shardingsphere-operator/go.mod
@@ -8,7 +8,7 @@ require (
        github.com/antlr/antlr4 v0.0.0-20181218183524-be58ebffde8e
        github.com/chaos-mesh/chaos-mesh/api v0.0.0-20230517110555-afab5b4a7813
        github.com/cloudnative-pg/cloudnative-pg v1.20.0
-       github.com/database-mesh/golang-sdk v0.0.0-20230517034007-f86740cbb78b
+       github.com/database-mesh/golang-sdk v0.0.0-20230605093335-916ac7abc788
        github.com/go-logr/logr v1.2.4
        github.com/go-sql-driver/mysql v1.7.1
        github.com/golang/mock v1.6.0
diff --git a/shardingsphere-operator/go.sum b/shardingsphere-operator/go.sum
index f6df1af..e03b58d 100644
--- a/shardingsphere-operator/go.sum
+++ b/shardingsphere-operator/go.sum
@@ -64,8 +64,10 @@ github.com/cncf/udpa/go 
v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
 github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod 
h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
 github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod 
h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
 github.com/creack/pty v1.1.9/go.mod 
h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
-github.com/database-mesh/golang-sdk v0.0.0-20230517034007-f86740cbb78b 
h1:qLK6dB1952pOD2sBNiOBktY9IDmX7Gn/WOG3tEvOw3g=
-github.com/database-mesh/golang-sdk v0.0.0-20230517034007-f86740cbb78b/go.mod 
h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
+github.com/database-mesh/golang-sdk v0.0.0-20230605075457-a525bc484e78 
h1:442d1dVUQFHuT+KcSW0XtsZpYYwwCBlyJGDujb44vfM=
+github.com/database-mesh/golang-sdk v0.0.0-20230605075457-a525bc484e78/go.mod 
h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
+github.com/database-mesh/golang-sdk v0.0.0-20230605093335-916ac7abc788 
h1:YEF8BDXHnEiek/EnDVbTCOrVDP7OT3v/R3a8mGM6+vc=
+github.com/database-mesh/golang-sdk v0.0.0-20230605093335-916ac7abc788/go.mod 
h1:yUEdo+aGdROl9oC7A1GeDB9/ubUtV2k73uLL+qC3PC4=
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
diff --git 
a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go 
b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index f1c0715..cff4499 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -38,6 +38,7 @@ import (
        apierrors "k8s.io/apimachinery/pkg/api/errors"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/types"
        "k8s.io/client-go/tools/record"
        ctrl "sigs.k8s.io/controller-runtime"
        "sigs.k8s.io/controller-runtime/pkg/client"
@@ -84,7 +85,7 @@ var _ = BeforeEach(func() {
        fakeStorageNodeReconciler()
 })
 
-var _ = Describe("StorageNode Controller Mock Test", func() {
+var _ = Describe("StorageNode Controller Mock Test For AWS Rds Instance", 
func() {
        BeforeEach(func() {
                // mock aws rds client
                mockCtrl = gomock.NewController(GinkgoT())
@@ -238,7 +239,7 @@ var _ = Describe("StorageNode Controller Mock Test", func() 
{
 
                        
Expect(newSN.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseReady))
                        Expect(newSN.Status.Instances).To(HaveLen(1))
-                       
Expect(newSN.Status.Instances[0].Status).To(Equal(string(dbmesh_rds.DBInstanceStatusReady)))
+                       
Expect(newSN.Status.Instances[0].Status).To(Equal(string(dbmesh_rds.DBInstanceStatusAvailable)))
                })
        })
 
@@ -735,3 +736,242 @@ var _ = Describe("StorageNode Controller Mock Test", 
func() {
                })
        })
 })
+
+var _ = Describe("StorageNode Controller Mock Test For AWS Aurora", func() {
+       BeforeEach(func() {
+               provider := v1alpha1.StorageProvider{
+                       ObjectMeta: metav1.ObjectMeta{
+                               Name: "aws-aurora",
+                       },
+                       Spec: v1alpha1.StorageProviderSpec{
+                               Provisioner: v1alpha1.ProvisionerAWSAurora,
+                               Parameters:  map[string]string{},
+                       },
+               }
+               Expect(fakeClient.Create(ctx, &provider)).Should(Succeed())
+
+               // mock aws client
+               // mock aws rds client
+               mockCtrl = gomock.NewController(GinkgoT())
+               mockAws = mock_aws.NewMockIRdsClient(mockCtrl)
+
+               monkey.Patch(aws.NewRdsClient, func(rds dbmesh_rds.RDS) 
aws.IRdsClient {
+                       return mockAws
+               })
+       })
+
+       AfterEach(func() {
+               mockCtrl.Finish()
+               monkey.UnpatchAll()
+       })
+
+       Context("reconcile storage node", func() {
+               It("should success when aws aurora cluster is not exits", 
func() {
+                       name := "test-aws-aurora-not-exists"
+                       namespacedName := types.NamespacedName{
+                               Name:      name,
+                               Namespace: defaultTestNamespace,
+                       }
+                       storageNode := v1alpha1.StorageNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      name,
+                                       Namespace: defaultTestNamespace,
+                                       Annotations: map[string]string{
+                                               
v1alpha1.AnnotationsClusterIdentifier: "test-aws-aurora",
+                                       },
+                               },
+                               Spec: v1alpha1.StorageNodeSpec{
+                                       StorageProviderName: "aws-aurora",
+                               },
+                       }
+                       Expect(fakeClient.Create(ctx, 
&storageNode)).Should(Succeed())
+
+                       descCluster := &dbmesh_rds.DescCluster{
+                               DBClusterIdentifier: "test-aws-aurora",
+                               PrimaryEndpoint:     
"test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+                               ReaderEndpoint:      
"test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
+                               Port:                3306,
+                               Status:              
dbmesh_rds.DBClusterStatusAvailable,
+                       }
+                       descInstance := &dbmesh_rds.DescInstance{
+                               DBInstanceIdentifier: "test-aws-aurora-1",
+                               DBClusterIdentifier:  "test-aws-aurora",
+                               Endpoint: dbmesh_rds.Endpoint{
+                                       Address: 
"test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+                                       Port:    3306,
+                               },
+                               DBInstanceStatus: 
dbmesh_rds.DBInstanceStatusAvailable,
+                       }
+
+                       // mock aws aurora cluster is not exist
+                       mockAws.EXPECT().GetAuroraCluster(gomock.Any(), 
gomock.Any()).Return(nil, nil).Times(1)
+                       // mock create aws aurora cluster
+                       mockAws.EXPECT().CreateAuroraCluster(gomock.Any(), 
gomock.Any(), gomock.Any()).Return(nil)
+                       // mock aws aurora cluster is created
+                       mockAws.EXPECT().GetAuroraCluster(gomock.Any(), 
gomock.Any()).Return(descCluster, nil).Times(1)
+                       // mock aws instance is created
+                       mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), 
gomock.Any()).Return([]*dbmesh_rds.DescInstance{descInstance}, nil).Times(1)
+
+                       req := ctrl.Request{NamespacedName: namespacedName}
+                       _, err := reconciler.Reconcile(ctx, req)
+                       Expect(err).To(BeNil())
+                       sn := &v1alpha1.StorageNode{}
+                       Expect(fakeClient.Get(ctx, namespacedName, 
sn)).Should(Succeed())
+                       
Expect(sn.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseReady))
+               })
+
+               It("should success when storage node been delete", func() {
+                       name := "test-aws-aurora-deleted"
+                       namespacedName := types.NamespacedName{
+                               Name:      name,
+                               Namespace: defaultTestNamespace,
+                       }
+                       req := ctrl.Request{NamespacedName: namespacedName}
+                       storageNode := &v1alpha1.StorageNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      name,
+                                       Namespace: defaultTestNamespace,
+                                       Annotations: map[string]string{
+                                               
v1alpha1.AnnotationsClusterIdentifier: "test-aws-aurora",
+                                       },
+                                       Finalizers: []string{FinalizerName},
+                               },
+                               Spec: v1alpha1.StorageNodeSpec{
+                                       StorageProviderName: "aws-aurora",
+                               },
+                               Status: v1alpha1.StorageNodeStatus{
+                                       Phase: v1alpha1.StorageNodePhaseReady,
+                                       Cluster: v1alpha1.ClusterStatus{
+                                               Status:          
dbmesh_rds.DBClusterStatusAvailable,
+                                               PrimaryEndpoint: 
v1alpha1.Endpoint{Address: 
"test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+                                               ReaderEndpoints: 
[]v1alpha1.Endpoint{{Address: 
"test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
+                                       },
+                                       Instances: []v1alpha1.InstanceStatus{
+                                               {
+                                                       Status:   
string(dbmesh_rds.DBInstanceStatusAvailable),
+                                                       Endpoint: 
v1alpha1.Endpoint{Address: 
"test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+                                               },
+                                       },
+                               },
+                       }
+
+                       Expect(fakeClient.Create(ctx, 
storageNode)).Should(Succeed())
+
+                       descCluster := &dbmesh_rds.DescCluster{
+                               DBClusterIdentifier: "test-aws-aurora",
+                               PrimaryEndpoint:     
"test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+                               ReaderEndpoint:      
"test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com",
+                               Port:                3306,
+                               Status:              
dbmesh_rds.DBClusterStatusAvailable,
+                       }
+
+                       descInstance := &dbmesh_rds.DescInstance{
+                               DBInstanceIdentifier: "test-aws-aurora-1",
+                               DBClusterIdentifier:  "test-aws-aurora",
+                               Endpoint: dbmesh_rds.Endpoint{
+                                       Address: 
"test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com",
+                                       Port:    3306,
+                               },
+                               DBInstanceStatus: 
dbmesh_rds.DBInstanceStatusAvailable,
+                       }
+
+                       Expect(fakeClient.Delete(ctx, 
storageNode)).Should(Succeed())
+
+                       // mock aws aurora is exists
+                       mockAws.EXPECT().GetAuroraCluster(gomock.Any(), 
gomock.Any()).Return(descCluster, nil).Times(1)
+                       // mock get instances of aws aurora
+                       mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), 
gomock.Any()).Return([]*dbmesh_rds.DescInstance{descInstance}, nil).Times(1)
+                       // mock delete aws aurora cluster
+                       mockAws.EXPECT().DeleteAuroraCluster(gomock.Any(), 
gomock.Any(), gomock.Any()).Return(nil)
+
+                       _, err := reconciler.Reconcile(ctx, req)
+                       Expect(err).To(BeNil())
+
+                       Expect(fakeClient.Get(ctx, namespacedName, 
storageNode)).Should(Succeed())
+                       Expect(storageNode.DeletionTimestamp).NotTo(BeNil())
+                       
Expect(storageNode.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseDeleting))
+               })
+
+               It("should be success when storage node is deleting", func() {
+                       name := "test-aws-aurora-deleting"
+                       namespacedName := types.NamespacedName{
+                               Name:      name,
+                               Namespace: defaultTestNamespace,
+                       }
+                       req := ctrl.Request{NamespacedName: namespacedName}
+                       deletionTimestamp := metav1.Now()
+                       storageNode := &v1alpha1.StorageNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      name,
+                                       Namespace: defaultTestNamespace,
+                                       Annotations: map[string]string{
+                                               
v1alpha1.AnnotationsClusterIdentifier: "test-aws-aurora",
+                                       },
+                                       Finalizers:        
[]string{FinalizerName},
+                                       DeletionTimestamp: &deletionTimestamp,
+                               },
+                               Spec: v1alpha1.StorageNodeSpec{
+                                       StorageProviderName: "aws-aurora",
+                               },
+                               Status: v1alpha1.StorageNodeStatus{
+                                       Phase: 
v1alpha1.StorageNodePhaseDeleting,
+                                       Cluster: v1alpha1.ClusterStatus{
+                                               Status:          
dbmesh_rds.DBClusterStatusDeleting,
+                                               PrimaryEndpoint: 
v1alpha1.Endpoint{Address: 
"test-aws-aurora.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+                                               ReaderEndpoints: 
[]v1alpha1.Endpoint{{Address: 
"test-aws-aurora.cluster-ro-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306}},
+                                       },
+                                       Instances: []v1alpha1.InstanceStatus{
+                                               {
+                                                       Status:   
string(dbmesh_rds.DBInstanceStatusDeleting),
+                                                       Endpoint: 
v1alpha1.Endpoint{Address: 
"test-aws-aurora-1.cluster-xxxxxx.us-east-1.rds.amazonaws.com", Port: 3306},
+                                               },
+                                       },
+                               },
+                       }
+                       Expect(fakeClient.Create(ctx, 
storageNode)).Should(Succeed())
+
+                       // mock aws aurora is not exists
+                       mockAws.EXPECT().GetAuroraCluster(gomock.Any(), 
gomock.Any()).Return(nil, nil).Times(1)
+                       // mock get instances of aws aurora is not exists
+                       mockAws.EXPECT().GetInstancesByFilters(gomock.Any(), 
gomock.Any()).Return(nil, nil).Times(1)
+
+                       _, err := reconciler.Reconcile(ctx, req)
+                       Expect(err).To(BeNil())
+                       Expect(fakeClient.Get(ctx, namespacedName, 
storageNode)).Should(Succeed())
+                       
Expect(storageNode.Status.Phase).To(Equal(v1alpha1.StorageNodePhaseDeleteComplete))
+               })
+
+               It("should be success when storage node is delete completed", 
func() {
+                       name := "test-aws-aurora-delete-completed"
+                       namespacedName := types.NamespacedName{
+                               Name:      name,
+                               Namespace: defaultTestNamespace,
+                       }
+                       req := ctrl.Request{NamespacedName: namespacedName}
+                       deletionTimestamp := metav1.Now()
+                       storageNode := &v1alpha1.StorageNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      name,
+                                       Namespace: defaultTestNamespace,
+                                       Annotations: map[string]string{
+                                               
v1alpha1.AnnotationsClusterIdentifier: "test-aws-aurora",
+                                       },
+                                       Finalizers:        
[]string{FinalizerName},
+                                       DeletionTimestamp: &deletionTimestamp,
+                               },
+                               Spec: v1alpha1.StorageNodeSpec{
+                                       StorageProviderName: "aws-aurora",
+                               },
+                               Status: v1alpha1.StorageNodeStatus{
+                                       Phase: 
v1alpha1.StorageNodePhaseDeleteComplete,
+                               },
+                       }
+                       Expect(fakeClient.Create(ctx, 
storageNode)).Should(Succeed())
+
+                       _, err := reconciler.Reconcile(ctx, req)
+                       Expect(err).To(BeNil())
+                       err = fakeClient.Get(ctx, namespacedName, storageNode)
+                       Expect(apierrors.IsNotFound(err)).To(BeTrue())
+               })
+       })
+})
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go 
b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index 9d051d7..e26c63d 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -70,7 +70,7 @@ type StorageNodeReconciler struct {
 // 
+kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storagenodes,verbs=get;list;watch;create;update;patch;delete
 // 
+kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storagenodes/status,verbs=get;update;patch
 // 
+kubebuilder:rbac:groups=shardingsphere.apache.org,resources=storagenodes/finalizers,verbs=update
-// 
+kubebuilder:rbac:groups=core.database-mesh.io,resources=storageProvideres,verbs=get;list;watch
+// 
+kubebuilder:rbac:groups=core.database-mesh.io,resources=storageProviders,verbs=get;list;watch
 
 // Reconcile handles main function of this controller
 // nolint:gocognit
@@ -84,7 +84,7 @@ func (r *StorageNodeReconciler) Reconcile(ctx 
context.Context, req ctrl.Request)
        }
 
        // Get storageProvider with storagenode.Spec.StorageProviderName
-       storageProvider, err := r.getstorageProvider(ctx, node)
+       storageProvider, err := r.getStorageProvider(ctx, node)
        if err != nil {
                r.Log.Error(err, fmt.Sprintf("unable to fetch storageProvider 
%s", node.Spec.StorageProviderName))
                return ctrl.Result{Requeue: true}, err
@@ -152,24 +152,27 @@ func (r *StorageNodeReconciler) finalize(ctx 
context.Context, node *v1alpha1.Sto
 }
 
 func (r *StorageNodeReconciler) reconcile(ctx context.Context, dbClass 
*v1alpha1.StorageProvider, node *v1alpha1.StorageNode) (ctrl.Result, error) {
+       var err error
        // reconcile storage node with storageProvider
        switch dbClass.Spec.Provisioner {
        case v1alpha1.ProvisionerAWSRDSInstance:
                if err := r.reconcileAwsRdsInstance(ctx, 
aws.NewRdsClient(r.AwsRDS), node, dbClass); err != nil {
-                       r.Log.Error(err, fmt.Sprintf("unable to reconcile AWS 
RDS Instance %s/%s, err:%s", node.GetNamespace(), node.GetName(), err.Error()))
                        r.Recorder.Eventf(node, corev1.EventTypeWarning, 
"Reconcile Failed", fmt.Sprintf("unable to reconcile AWS RDS Instance %s/%s, 
err:%s", node.GetNamespace(), node.GetName(), err.Error()))
+                       return ctrl.Result{RequeueAfter: defaultRequeueTime}, 
err
                }
        case v1alpha1.ProvisionerAWSAurora:
                if err := r.reconcileAwsAurora(ctx, aws.NewRdsClient(r.AwsRDS), 
node, dbClass); err != nil {
                        r.Recorder.Eventf(node, corev1.EventTypeWarning, 
"Reconcile Failed", fmt.Sprintf("unable to reconcile AWS Aurora %s/%s, err:%s", 
node.GetNamespace(), node.GetName(), err.Error()))
+                       return ctrl.Result{RequeueAfter: defaultRequeueTime}, 
err
                }
        case v1alpha1.ProvisionerCloudNativePG:
                if err := r.reconcileCloudNativePG(ctx, node, dbClass); err != 
nil {
                        r.Recorder.Eventf(node, corev1.EventTypeWarning, 
"Reconcile Failed", fmt.Sprintf("unable to reconcile CloudNative PG %s/%s, 
err:%s", node.GetNamespace(), node.GetName(), err.Error()))
+                       return ctrl.Result{RequeueAfter: defaultRequeueTime}, 
err
                }
        default:
                r.Recorder.Event(node, corev1.EventTypeWarning, 
"UnsupportedDatabaseProvisioner", fmt.Sprintf("unsupported database provisioner 
%s", dbClass.Spec.Provisioner))
-               r.Log.Error(nil, fmt.Sprintf("unsupported database provisioner 
%s", dbClass.Spec.Provisioner))
+               return ctrl.Result{RequeueAfter: defaultRequeueTime}, err
        }
 
        // register storage unit if needed.
@@ -192,7 +195,7 @@ func (r *StorageNodeReconciler) reconcile(ctx 
context.Context, dbClass *v1alpha1
        return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
 }
 
-func (r *StorageNodeReconciler) getstorageProvider(ctx context.Context, node 
*v1alpha1.StorageNode) (storageProvider *v1alpha1.StorageProvider, err error) {
+func (r *StorageNodeReconciler) getStorageProvider(ctx context.Context, node 
*v1alpha1.StorageNode) (storageProvider *v1alpha1.StorageProvider, err error) {
        if node.Spec.StorageProviderName == "" {
                r.Recorder.Event(node, corev1.EventTypeWarning, 
"storageProviderNameIsNil", "storageProviderName is nil")
                return nil, fmt.Errorf("storageProviderName is nil")
@@ -226,12 +229,12 @@ func computeDesiredState(status 
v1alpha1.StorageNodeStatus) v1alpha1.StorageNode
 
        if status.Phase == v1alpha1.StorageNodePhaseDeleting {
                // If the storage node is being deleted, check if all instances 
are deleted.
-               if len(status.Instances) == 0 {
+               if clusterStatus == "" && len(status.Instances) == 0 {
                        desiredState.Phase = 
v1alpha1.StorageNodePhaseDeleteComplete
                }
        } else {
                // If the storage node is not being deleted, check if all 
instances are ready.
-               if (clusterStatus == "" || clusterStatus == "Ready") && 
allInstancesReady(status.Instances) {
+               if (clusterStatus == "" || clusterStatus == 
rds.DBClusterStatusAvailable) && allInstancesReady(status.Instances) {
                        desiredState.Phase = v1alpha1.StorageNodePhaseReady
                } else {
                        desiredState.Phase = v1alpha1.StorageNodePhaseNotReady
@@ -249,7 +252,7 @@ func computeNewConditions(desiredState, status 
v1alpha1.StorageNodeStatus, clust
 
        // Update the cluster ready condition if the cluster status is not empty
        if clusterStatus != "" {
-               if clusterStatus == "Ready" {
+               if clusterStatus == rds.DBClusterStatusAvailable {
                        
newSNConditions.UpsertCondition(&v1alpha1.StorageNodeCondition{
                                Type:           
v1alpha1.StorageNodeConditionTypeClusterReady,
                                Status:         corev1.ConditionTrue,
@@ -312,7 +315,7 @@ func allInstancesReady(instances []v1alpha1.InstanceStatus) 
bool {
 
        for idx := range instances {
                instance := &instances[idx]
-               if !(instance.Status == "Ready") {
+               if !(instance.Status == rds.DBClusterStatusAvailable) {
                        return false
                }
        }
@@ -353,81 +356,86 @@ func updateAWSRDSInstanceStatus(node 
*v1alpha1.StorageNode, instance *rds.DescIn
                return nil
        }
 
-       status := instance.DBInstanceStatus
-       if status == rds.DBInstanceStatusAvailable {
-               status = rds.DBInstanceStatusReady
-       }
-
        instances = append(instances, v1alpha1.InstanceStatus{
                Endpoint: v1alpha1.Endpoint{
                        Address: instance.Endpoint.Address,
                        Port:    instance.Endpoint.Port,
                },
-               Status: string(status),
+               Status: string(instance.DBInstanceStatus),
        })
 
        node.Status.Instances = instances
        return nil
 }
 
-func (r *StorageNodeReconciler) reconcileAwsAurora(ctx context.Context, client 
aws.IRdsClient, node *v1alpha1.StorageNode, dbClass *v1alpha1.StorageProvider) 
error {
-       // get instance
-       aurora, err := client.GetAuroraCluster(ctx, node)
+func (r *StorageNodeReconciler) reconcileAwsAurora(ctx context.Context, client 
aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider 
*v1alpha1.StorageProvider) error {
+       r.Log.Info("reconcileAwsAurora", "node", node.GetName(), "phase", 
node.Status.Phase)
+       auroraCluster, err := client.GetAuroraCluster(ctx, node)
        if err != nil {
                return err
        }
-       if aurora == nil {
+
+       if auroraCluster == nil {
                // create instance
-               err = client.CreateAuroraCluster(ctx, node, 
dbClass.Spec.Parameters)
+               err = client.CreateAuroraCluster(ctx, node, 
storageProvider.Spec.Parameters)
+               if err != nil {
+                       return err
+               }
+               auroraCluster, err = client.GetAuroraCluster(ctx, node)
                if err != nil {
                        return err
                }
        }
-       // TODO: update storage node status
-       newStatus, err := updateClusterStatus(ctx, node, client, aurora)
-       if err != nil {
-               return err
-       }
-       node.Status.Cluster = newStatus
-       if err := r.Status().Update(ctx, node); err != nil {
-               r.Log.Error(err, fmt.Sprintf("Failed to update cluster status 
for node %s/%s", node.GetNamespace(), node.GetName()))
+
+       // update storage node status
+       if err := updateClusterStatus(ctx, client, node, auroraCluster); err != 
nil {
+               return fmt.Errorf("updateClusterStatus failed: %w", err)
        }
-       r.Recorder.Eventf(node, corev1.EventTypeNormal, "Reconcile", 
"Reconciled Aurora cluster %s, status is %s", aurora.DBClusterIdentifier, 
aurora.Status)
 
        return nil
 }
 
-func updateClusterStatus(ctx context.Context, node *v1alpha1.StorageNode, 
client aws.IRdsClient, cluster *rds.DescCluster) (v1alpha1.ClusterStatus, 
error) {
-       clusterStatus := v1alpha1.ClusterStatus{
-               PrimaryEndpoint: v1alpha1.Endpoint{
-                       Address: cluster.PrimaryEndpoint,
-                       Port:    cluster.Port,
-               },
-       }
-       status := cluster.Status
-       if status == "available" {
-               status = "Ready"
+func updateClusterStatus(ctx context.Context, client aws.IRdsClient, node 
*v1alpha1.StorageNode, cluster *rds.DescCluster) error {
+       // update cluster status
+       clusterStatus := v1alpha1.ClusterStatus{}
+       if cluster != nil {
+               clusterStatus = v1alpha1.ClusterStatus{
+                       Status: cluster.Status,
+                       PrimaryEndpoint: v1alpha1.Endpoint{
+                               Address: cluster.PrimaryEndpoint,
+                               Port:    cluster.Port,
+                       },
+                       ReaderEndpoints: []v1alpha1.Endpoint{
+                               {
+                                       Address: cluster.ReaderEndpoint,
+                                       Port:    cluster.Port,
+                               },
+                       },
+               }
        }
-       clusterStatus.Status = status
-
-       if len(cluster.ReadReplicaIdentifiers) == 0 {
-               clusterStatus.ReaderEndpoints = []v1alpha1.Endpoint{}
-               return clusterStatus, nil
-       } else {
+       node.Status.Cluster = clusterStatus
 
-               for _, readident := range cluster.ReadReplicaIdentifiers {
-                       instance, err := client.GetInstanceByIdentifier(ctx, 
readident)
-                       if err != nil {
-                               return clusterStatus, err
-                       }
+       // update instances status
+       identifier := node.Annotations[v1alpha1.AnnotationsClusterIdentifier]
+       filters := map[string][]string{
+               "db-cluster-id": {identifier},
+       }
+       instances, err := client.GetInstancesByFilters(ctx, filters)
+       if err != nil {
+               return fmt.Errorf("GetInstances failed, err:%w", err)
+       }
 
-                       clusterStatus.ReaderEndpoints = 
append(clusterStatus.ReaderEndpoints, v1alpha1.Endpoint{
+       var instanceStatus []v1alpha1.InstanceStatus
+       for _, instance := range instances {
+               instanceStatus = append(instanceStatus, v1alpha1.InstanceStatus{
+                       Status: string(instance.DBInstanceStatus),
+                       Endpoint: v1alpha1.Endpoint{
                                Address: instance.Endpoint.Address,
                                Port:    instance.Endpoint.Port,
-                       })
-               }
-               return clusterStatus, nil
+                       }})
        }
+       node.Status.Instances = instanceStatus
+       return nil
 }
 
 // deleteDatabaseCluster
@@ -438,8 +446,8 @@ func (r *StorageNodeReconciler) deleteDatabaseCluster(ctx 
context.Context, node
                        return fmt.Errorf("delete aws rds instance failed: %w", 
err)
                }
        case v1alpha1.ProvisionerAWSAurora:
-               if err := aws.NewRdsClient(r.AwsRDS).DeleteAuroraCluster(ctx, 
node, storageProvider); err != nil {
-                       return err
+               if err := r.deleteAWSAurora(ctx, aws.NewRdsClient(r.AwsRDS), 
node, storageProvider); err != nil {
+                       return fmt.Errorf("delete aws aurora cluster failed: 
%w", err)
                }
        default:
                return fmt.Errorf("unsupported database provisioner %s", 
storageProvider.Spec.Provisioner)
@@ -469,6 +477,26 @@ func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx 
context.Context, client
        return nil
 }
 
+func (r *StorageNodeReconciler) deleteAWSAurora(ctx context.Context, client 
aws.IRdsClient, node *v1alpha1.StorageNode, storageProvider 
*v1alpha1.StorageProvider) error {
+       auroraCluster, err := client.GetAuroraCluster(ctx, node)
+       if err != nil {
+               return fmt.Errorf("get aurora cluster failed: %w", err)
+       }
+       if auroraCluster != nil && auroraCluster.Status != 
rds.DBClusterStatusDeleting {
+               if err := client.DeleteAuroraCluster(ctx, node, 
storageProvider); err != nil {
+                       r.Recorder.Eventf(node, corev1.EventTypeWarning, 
"DeleteFailed", "Failed to delete aurora cluster %s: %s", 
node.Annotations[v1alpha1.AnnotationsClusterIdentifier], err.Error())
+                       return err
+               }
+               r.Recorder.Event(node, corev1.EventTypeNormal, "Deleting", 
fmt.Sprintf("aurora cluster %s is deleting", 
node.Annotations[v1alpha1.AnnotationsClusterIdentifier]))
+       }
+
+       // update storage node status
+       if err := updateClusterStatus(ctx, client, node, auroraCluster); err != 
nil {
+               return fmt.Errorf("updateClusterStatus failed: %w", err)
+       }
+       return nil
+}
+
 // registerStorageUnit
 func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node 
*v1alpha1.StorageNode, dbClass *v1alpha1.StorageProvider) error {
        // if register storage unit is not enabled, return
diff --git 
a/shardingsphere-operator/pkg/kubernetes/cloudnative-pg/cloudnative-pg.go 
b/shardingsphere-operator/pkg/kubernetes/cloudnative-pg/cloudnative-pg.go
index 621b3d4..45a9b55 100644
--- a/shardingsphere-operator/pkg/kubernetes/cloudnative-pg/cloudnative-pg.go
+++ b/shardingsphere-operator/pkg/kubernetes/cloudnative-pg/cloudnative-pg.go
@@ -28,7 +28,7 @@ import (
        "sigs.k8s.io/controller-runtime/pkg/client"
 )
 
-// NewCloudNativePG creates a new CloudNativePG client
+// NewCloudNativePGClient creates a new CloudNativePG client
 func NewCloudNativePGClient(c client.Client) CloudNativePG {
        return cloudnativePGClient{
                builder: builder{},
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go 
b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
index 7549192..21f51aa 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aurora.go
@@ -20,20 +20,39 @@ package aws
 import (
        "context"
        "errors"
+       "fmt"
 
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
        "github.com/database-mesh/golang-sdk/aws/client/rds"
-       dbmeshv1alpha1 
"github.com/database-mesh/golang-sdk/kubernetes/api/v1alpha1"
 )
 
+// CreateAuroraCluster creates aurora cluster
+// ref: 
https://docs.aws.amazon.com/zh_cn/AmazonRDS/latest/APIReference/API_CreateDBInstance.html
 func (c *RdsClient) CreateAuroraCluster(ctx context.Context, node 
*v1alpha1.StorageNode, params map[string]string) error {
        aurora := c.Aurora()
+
+       // set required params
+       aurora.SetDBInstanceClass(params["instanceClass"]).
+               SetEngine(params["engine"]).
+               SetDBClusterIdentifier(params["clusterIdentifier"])
+
+       // set optional params
+       if params["engineVersion"] != "" {
+               aurora.SetEngineVersion(params["engineVersion"])
+       }
+       if params["masterUsername"] != "" {
+               aurora.SetMasterUsername(params["masterUsername"])
+       }
+       if params["masterUserPassword"] != "" {
+               aurora.SetMasterUserPassword(params["masterUserPassword"])
+       }
+
        err := aurora.Create(ctx)
        return err
 }
 
 func (c *RdsClient) GetAuroraCluster(ctx context.Context, node 
*v1alpha1.StorageNode) (cluster *rds.DescCluster, err error) {
-       identifier, ok := 
node.Annotations[dbmeshv1alpha1.AnnotationsClusterIdentifier]
+       identifier, ok := 
node.Annotations[v1alpha1.AnnotationsClusterIdentifier]
        if !ok {
                return nil, errors.New("cluster identifier is empty")
        }
@@ -48,6 +67,26 @@ func (c *RdsClient) GetAuroraCluster(ctx context.Context, 
node *v1alpha1.Storage
 }
 
 func (c *RdsClient) DeleteAuroraCluster(ctx context.Context, node 
*v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {
+       identifier, ok := 
node.Annotations[v1alpha1.AnnotationsClusterIdentifier]
+       if !ok {
+               return fmt.Errorf("cluster identifier is empty")
+       }
+       // get instances of aurora cluster
+       filters := map[string][]string{
+               "db-cluster-id": {identifier},
+       }
+       instances, err := c.GetInstancesByFilters(ctx, filters)
+       if err != nil {
+               return fmt.Errorf("get instances failed, %v", err)
+       }
+       // delete instance first
+       for _, ins := range instances {
+               if err := c.DeleteInstance(ctx, node, storageProvider); err != 
nil {
+                       return fmt.Errorf("delete instance=%s of aurora=%s 
failed, %v", ins.DBInstanceIdentifier, identifier, err)
+               }
+       }
+       // delete cluster
        aurora := c.Aurora()
+       aurora.SetDBClusterIdentifier(identifier)
        return aurora.Delete(ctx)
 }
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go 
b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go
index 1a697a5..cf90112 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/aws.go
@@ -36,6 +36,7 @@ type IRdsClient interface {
        CreateInstance(ctx context.Context, node *v1alpha1.StorageNode, params 
map[string]string) error
        GetInstance(ctx context.Context, node *v1alpha1.StorageNode) (instance 
*rds.DescInstance, err error)
        GetInstanceByIdentifier(ctx context.Context, identifier string) 
(*rds.DescInstance, error)
+       GetInstancesByFilters(ctx context.Context, filters map[string][]string) 
(instances []*rds.DescInstance, err error)
        DeleteInstance(ctx context.Context, node *v1alpha1.StorageNode, 
storageProvider *v1alpha1.StorageProvider) error
 
        CreateAuroraCluster(ctx context.Context, node *v1alpha1.StorageNode, 
params map[string]string) error
diff --git a/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go 
b/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go
index 2d9911c..060125d 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks/aws.go
@@ -165,6 +165,21 @@ func (mr *MockIRdsClientMockRecorder) 
GetInstanceByIdentifier(ctx, identifier in
        return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"GetInstanceByIdentifier", 
reflect.TypeOf((*MockIRdsClient)(nil).GetInstanceByIdentifier), ctx, identifier)
 }
 
+// GetInstancesByFilters mocks base method.
+func (m *MockIRdsClient) GetInstancesByFilters(ctx context.Context, filters 
map[string][]string) ([]*rds.DescInstance, error) {
+       m.ctrl.T.Helper()
+       ret := m.ctrl.Call(m, "GetInstancesByFilters", ctx, filters)
+       ret0, _ := ret[0].([]*rds.DescInstance)
+       ret1, _ := ret[1].(error)
+       return ret0, ret1
+}
+
+// GetInstancesByFilters indicates an expected call of GetInstancesByFilters.
+func (mr *MockIRdsClientMockRecorder) GetInstancesByFilters(ctx, filters 
interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"GetInstancesByFilters", 
reflect.TypeOf((*MockIRdsClient)(nil).GetInstancesByFilters), ctx, filters)
+}
+
 // Instance mocks base method.
 func (m *MockIRdsClient) Instance() rds.Instance {
        m.ctrl.T.Helper()
diff --git 
a/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go 
b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go
index 43f347d..7fe5f59 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go
@@ -163,6 +163,14 @@ func (c *RdsClient) GetInstanceByIdentifier(ctx 
context.Context, identifier stri
        return instance.Describe(ctx)
 }
 
+func (c *RdsClient) GetInstancesByFilters(ctx context.Context, filters 
map[string][]string) ([]*rds.DescInstance, error) {
+       instance := c.Instance()
+       for k, v := range filters {
+               instance.SetFilter(k, v)
+       }
+       return instance.DescribeAll(ctx)
+}
+
 // DeleteInstance delete rds instance.
 // aws rds instance status doc: 
https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/accessing-monitoring.html
 func (c *RdsClient) DeleteInstance(ctx context.Context, node 
*v1alpha1.StorageNode, storageProvider *v1alpha1.StorageProvider) error {

Reply via email to