This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new 212627b  feat(storage-node): support register storage node to 
shardingsphere
     new 2a9bbb5  Merge pull request #357 from Xu-Wentao/storage-node-register
212627b is described below

commit 212627b1418a9a31ff6fe96b12496ab136df4cfe
Author: xuwentao <[email protected]>
AuthorDate: Wed May 10 17:31:50 2023 +0800

    feat(storage-node): support register storage node to shardingsphere
---
 .../cmd/shardingsphere-operator/manager/option.go  |   1 +
 shardingsphere-operator/go.mod                     |   4 +-
 shardingsphere-operator/go.sum                     |   2 -
 .../controllers/storage_ndoe_controller_test.go    | 118 +++++++++++-
 .../pkg/controllers/storage_node_controller.go     | 138 ++++++++++++++
 .../pkg/shardingsphere/mocks/shardingsphere.go     | 107 +++++++++++
 .../pkg/shardingsphere/shardingsphere.go           | 199 +++++++++++++++++++++
 .../shardingsphere/shardingsphere_suite_test.go    |  30 ++++
 .../pkg/shardingsphere/shardingsphere_test.go      | 135 ++++++++++++++
 .../test/e2e/storage_node_controller_test.go       |   6 +
 10 files changed, 735 insertions(+), 5 deletions(-)

diff --git 
a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go 
b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
index ded3c83..c922c3b 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
@@ -151,6 +151,7 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
                        Scheme:   mgr.GetScheme(),
                        Log:      mgr.GetLogger(),
                        Recorder: 
mgr.GetEventRecorderFor(controllers.StorageNodeControllerName),
+                       Service:  service.NewServiceClient(mgr.GetClient()),
                }
 
                // init aws client if aws credentials are provided
diff --git a/shardingsphere-operator/go.mod b/shardingsphere-operator/go.mod
index 621957a..4babe65 100644
--- a/shardingsphere-operator/go.mod
+++ b/shardingsphere-operator/go.mod
@@ -4,10 +4,12 @@ go 1.19
 
 require (
        bou.ke/monkey v1.0.2
+       github.com/DATA-DOG/go-sqlmock v1.5.0
        github.com/antlr/antlr4 v0.0.0-20181218183524-be58ebffde8e
        github.com/chaos-mesh/chaos-mesh/api v0.0.0-20230410023700-25a841a23cd2
        github.com/database-mesh/golang-sdk v0.0.0-20230420101548-53265cd9883a
        github.com/go-logr/logr v1.2.4
+       github.com/go-sql-driver/mysql v1.7.1
        github.com/golang/mock v1.6.0
        github.com/onsi/ginkgo/v2 v2.8.0
        github.com/onsi/gomega v1.26.0
@@ -25,7 +27,6 @@ require (
 require github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // 
indirect
 
 require (
-       github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect
        github.com/aws/aws-sdk-go-v2 v1.17.5 // indirect
        github.com/aws/aws-sdk-go-v2/config v1.18.4 // indirect
        github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect
@@ -51,7 +52,6 @@ require (
        github.com/go-openapi/jsonpointer v0.19.6 // indirect
        github.com/go-openapi/jsonreference v0.20.2 // indirect
        github.com/go-openapi/swag v0.22.3 // indirect
-       github.com/go-sql-driver/mysql v1.7.0 // indirect
        github.com/gogo/protobuf v1.3.2 // indirect
        github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // 
indirect
        github.com/golang/protobuf v1.5.3 // indirect
diff --git a/shardingsphere-operator/go.sum b/shardingsphere-operator/go.sum
index 863342f..7f235c4 100644
--- a/shardingsphere-operator/go.sum
+++ b/shardingsphere-operator/go.sum
@@ -95,8 +95,6 @@ github.com/go-openapi/jsonreference v0.20.2 
h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2Kv
 github.com/go-openapi/jsonreference v0.20.2/go.mod 
h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
 github.com/go-openapi/swag v0.22.3 
h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
 github.com/go-openapi/swag v0.22.3/go.mod 
h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
-github.com/go-sql-driver/mysql v1.7.0 
h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
-github.com/go-sql-driver/mysql v1.7.0/go.mod 
h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
 github.com/go-sql-driver/mysql v1.7.1 
h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
 github.com/go-sql-driver/mysql v1.7.1/go.mod 
h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
diff --git 
a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go 
b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index 3015dde..8bca029 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -22,9 +22,11 @@ import (
        "time"
 
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/service"
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/storagenode/aws"
        mock_aws 
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks"
-       apierrors "k8s.io/apimachinery/pkg/api/errors"
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/shardingsphere"
+       mock_shardingsphere 
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/shardingsphere/mocks"
 
        "bou.ke/monkey"
        dbmesh_aws "github.com/database-mesh/golang-sdk/aws"
@@ -33,6 +35,8 @@ import (
        "github.com/golang/mock/gomock"
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
+       corev1 "k8s.io/api/core/v1"
+       apierrors "k8s.io/apimachinery/pkg/api/errors"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/client-go/tools/record"
@@ -56,6 +60,7 @@ var (
        reconciler *StorageNodeReconciler
        mockCtrl   *gomock.Controller
        mockAws    *mock_aws.MockIRdsClient
+       mockSS     *mock_shardingsphere.MockIServer
 )
 
 func fakeStorageNodeReconciler() {
@@ -64,6 +69,7 @@ func fakeStorageNodeReconciler() {
        scheme := runtime.NewScheme()
        Expect(dbmeshv1alpha1.AddToScheme(scheme)).To(Succeed())
        Expect(v1alpha1.AddToScheme(scheme)).To(Succeed())
+       Expect(corev1.AddToScheme(scheme)).To(Succeed())
        fakeClient = fake.NewClientBuilder().WithScheme(scheme).Build()
 
        sess := dbmesh_aws.NewSessions().SetCredential("AwsRegion", 
"AwsAccessKeyID", "AwsSecretAccessKey").Build()
@@ -72,6 +78,7 @@ func fakeStorageNodeReconciler() {
                Log:      logf.Log,
                Recorder: record.NewFakeRecorder(100),
                AwsRDS:   dbmesh_rds.NewService(sess["AwsRegion"]),
+               Service:  service.NewServiceClient(fakeClient),
        }
 }
 
@@ -380,4 +387,113 @@ var _ = Describe("StorageNode Controller Mock Test", 
func() {
                        Expect(apierrors.IsNotFound(err)).To(BeTrue())
                })
        })
+
+       Context("Test register storage node", func() {
+               BeforeEach(func() {
+                       mockCtrl = gomock.NewController(GinkgoT())
+                       mockSS = mock_shardingsphere.NewMockIServer(mockCtrl)
+                       monkey.Patch(shardingsphere.NewServer, func(_, _ 
string, _ uint, _, _ string) (shardingsphere.IServer, error) {
+                               return mockSS, nil
+                       })
+               })
+               AfterEach(func() {
+                       mockCtrl.Finish()
+               })
+               It("should be successful when storage node is not registered", 
func() {
+                       nodeName := "test register storage node"
+                       cnName := "test-compute-node"
+                       req := ctrl.Request{
+                               NamespacedName: client.ObjectKey{
+                                       Name:      nodeName,
+                                       Namespace: defaultTestNamespace,
+                               },
+                       }
+                       storageNode := &v1alpha1.StorageNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      nodeName,
+                                       Namespace: defaultTestNamespace,
+                                       Annotations: map[string]string{
+                                               
AnnotationKeyRegisterStorageUnitEnabled: "true",
+                                               AnnotationKeyDatabaseName:      
         "sharding_db",
+                                               
AnnotationKeyComputeNodeNamespace:       defaultTestNamespace,
+                                               AnnotationKeyComputeNodeName:   
         cnName,
+                                       },
+                               },
+                               Spec: v1alpha1.StorageNodeSpec{
+                                       DatabaseClassName: defaultTestDBClass,
+                               },
+                       }
+                       ins := &dbmesh_rds.DescInstance{
+                               DBInstanceIdentifier: 
"ins-test-register-storage-node",
+                               DBInstanceStatus:     
v1alpha1.StorageNodeInstanceStatusAvailable,
+                               Endpoint: dbmesh_rds.Endpoint{
+                                       Address: "127.0.0.1",
+                                       Port:    3306,
+                               },
+                       }
+                       cn := &v1alpha1.ComputeNode{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      cnName,
+                                       Namespace: defaultTestNamespace,
+                               },
+                               Spec: v1alpha1.ComputeNodeSpec{
+                                       Bootstrap: v1alpha1.BootstrapConfig{
+                                               ServerConfig: 
v1alpha1.ServerConfig{
+                                                       Authority: 
v1alpha1.ComputeNodeAuthority{
+                                                               Users: 
[]v1alpha1.ComputeNodeUser{
+                                                                       {
+                                                                               
User:     "root@%",
+                                                                               
Password: "root",
+                                                                       },
+                                                               },
+                                                               Privilege: 
v1alpha1.ComputeNodePrivilege{
+                                                                       Type: 
v1alpha1.AllPermitted,
+                                                               },
+                                                       },
+                                                       Props: 
map[string]string{
+                                                               
ShardingSphereProtocolType: "MySQL",
+                                                       },
+                                               },
+                                       },
+                               },
+                       }
+
+                       svc := &corev1.Service{
+                               ObjectMeta: metav1.ObjectMeta{
+                                       Name:      cnName,
+                                       Namespace: defaultTestNamespace,
+                               },
+                               Spec: corev1.ServiceSpec{
+                                       Ports: []corev1.ServicePort{
+                                               {
+                                                       Name: 
"shardingsphere-proxy",
+                                                       Port: 3306,
+                                               },
+                                       },
+                                       ClusterIP: "127.0.0.1",
+                                       Type:      corev1.ServiceTypeClusterIP,
+                               },
+                       }
+
+                       Expect(fakeClient.Create(ctx, 
storageNode)).Should(Succeed())
+                       Expect(fakeClient.Create(ctx, cn)).Should(Succeed())
+                       Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
+
+                       // mock aws rds client, get available instance
+                       mockAws.EXPECT().GetInstance(gomock.Any(), 
gomock.Any()).Return(ins, nil)
+                       // mock shardingsphere create database
+                       mockSS.EXPECT().CreateDatabase(gomock.Any()).Return(nil)
+                       // mock shardingsphere register storage unit
+                       mockSS.EXPECT().RegisterStorageUnit(gomock.Any(), 
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil)
+                       // mock shardingsphere close connection
+                       mockSS.EXPECT().Close()
+
+                       _, err := reconciler.Reconcile(ctx, req)
+                       Expect(err).To(BeNil())
+
+                       registeredSN := &v1alpha1.StorageNode{}
+                       Expect(fakeClient.Get(ctx, client.ObjectKey{Name: 
defaultTestStorageNode, Namespace: defaultTestNamespace}, 
registeredSN)).Should(Succeed())
+                       // TODO update storage node register status
+               })
+       })
 })
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go 
b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index b2efb9e..dd6acb9 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -21,9 +21,12 @@ import (
        "context"
        "fmt"
        "reflect"
+       "strings"
 
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/service"
        
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/storagenode/aws"
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/shardingsphere"
 
        "github.com/database-mesh/golang-sdk/aws/client/rds"
        dbmeshv1alpha1 
"github.com/database-mesh/golang-sdk/kubernetes/api/v1alpha1"
@@ -31,6 +34,7 @@ import (
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/types"
        "k8s.io/client-go/tools/record"
        "k8s.io/utils/strings/slices"
        ctrl "sigs.k8s.io/controller-runtime"
@@ -40,6 +44,13 @@ import (
 const (
        StorageNodeControllerName = "storage-node-controller"
        FinalizerName             = "shardingsphere.apache.org/finalizer"
+
+       AnnotationKeyRegisterStorageUnitEnabled = 
"shardingsphere.apache.org/register-storage-unit-enabled"
+       AnnotationKeyComputeNodeNamespace       = 
"shardingsphere.apache.org/compute-node-namespace"
+       AnnotationKeyComputeNodeName            = 
"shardingsphere.apache.org/compute-node-name"
+       AnnotationKeyDatabaseName               = 
"shardingsphere.apache.org/database-name"
+
+       ShardingSphereProtocolType = "proxy-frontend-database-protocol-type"
 )
 
 // StorageNodeReconciler is a controller for storage nodes
@@ -49,6 +60,8 @@ type StorageNodeReconciler struct {
        Log      logr.Logger
        Recorder record.EventRecorder
        AwsRDS   rds.RDS
+
+       Service service.Service
 }
 
 // Reconcile handles main function of this controller
@@ -156,10 +169,21 @@ func (r *StorageNodeReconciler) reconcile(ctx 
context.Context, dbClass *dbmeshv1
                }
        }
 
+       // register storage unit if needed.
+       if err := r.registerStorageUnit(ctx, node); err != nil {
+               r.Log.Error(err, fmt.Sprintf("unable to register storage unit 
[%s:%s]", node.GetNamespace(), node.GetName()))
+               return ctrl.Result{Requeue: true}, err
+       }
+
        return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
 }
 
 func (r *StorageNodeReconciler) getDatabaseClass(ctx context.Context, node 
*v1alpha1.StorageNode) (databaseClass *dbmeshv1alpha1.DatabaseClass, err error) 
{
+       if node.Spec.DatabaseClassName == "" {
+               r.Recorder.Event(node, corev1.EventTypeWarning, 
"DatabaseClassNameIsNil", "DatabaseClassName is nil")
+               return nil, fmt.Errorf("DatabaseClassName is nil")
+       }
+
        databaseClass = &dbmeshv1alpha1.DatabaseClass{}
 
        if err := r.Get(ctx, client.ObjectKey{Name: 
node.Spec.DatabaseClassName}, databaseClass); err != nil {
@@ -422,9 +446,123 @@ func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx 
context.Context, client
        return nil
 }
 
+// registerStorageUnit
+func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node 
*v1alpha1.StorageNode) error {
+       // if register storage unit is not enabled, return
+       if node.Annotations[AnnotationKeyRegisterStorageUnitEnabled] != "true" {
+               return nil
+       }
+       // if node is not ready, return
+       if node.Status.Phase != v1alpha1.StorageNodePhaseReady {
+               return nil
+       }
+
+       if err := validateComputeNodeAnnotations(node); err != nil {
+               return err
+       }
+
+       dbName := node.Annotations[AnnotationKeyDatabaseName]
+
+       ssServer, err := r.getShardingsphereServer(ctx, node)
+       if err != nil {
+               return fmt.Errorf("getShardingsphereServer failed: %w", err)
+       }
+
+       defer ssServer.Close()
+
+       if err := ssServer.CreateDatabase(dbName); err != nil {
+               return fmt.Errorf("create database failed: %w", err)
+       }
+
+       // TODO add cluster
+
+       ins := node.Status.Instances[0]
+       host := ins.Endpoint.Address
+       port := ins.Endpoint.Port
+       username := node.Annotations[dbmeshv1alpha1.AnnotationsMasterUsername]
+       password := 
node.Annotations[dbmeshv1alpha1.AnnotationsMasterUserPassword]
+
+       // TODO how to set ds name?
+       if err := ssServer.RegisterStorageUnit("ds_0", host, uint(port), 
dbName, username, password); err != nil {
+               return fmt.Errorf("register storage node failed: %w", err)
+       }
+
+       return nil
+}
+
+func validateComputeNodeAnnotations(node *v1alpha1.StorageNode) error {
+       requiredAnnos := []string{
+               AnnotationKeyDatabaseName,
+               AnnotationKeyComputeNodeNamespace,
+               AnnotationKeyComputeNodeName,
+       }
+
+       for _, anno := range requiredAnnos {
+               if v, ok := node.Annotations[anno]; !ok || v == "" {
+                       return fmt.Errorf("annotation [%s] is required", anno)
+               }
+       }
+
+       return nil
+}
+
+func (r *StorageNodeReconciler) getShardingsphereServer(ctx context.Context, 
node *v1alpha1.StorageNode) (shardingsphere.IServer, error) {
+       var (
+               driver, host, username, password string
+               port                             uint
+       )
+
+       // get compute node
+       cn := &v1alpha1.ComputeNode{}
+       if err := r.Client.Get(ctx, types.NamespacedName{
+               Name:      node.Annotations[AnnotationKeyComputeNodeName],
+               Namespace: node.Annotations[AnnotationKeyComputeNodeNamespace],
+       }, cn); err != nil {
+               return nil, fmt.Errorf("get compute node failed: %w", err)
+       }
+
+       serverConf := cn.Spec.Bootstrap.ServerConfig
+
+       driver, ok := serverConf.Props[ShardingSphereProtocolType]
+       if !ok || driver == "" {
+               driver = "mysql"
+       }
+       driver = strings.ToLower(driver)
+
+       if len(serverConf.Authority.Users) == 0 {
+               return nil, fmt.Errorf("no user in compute node [%s]", 
cn.Namespace+"/"+cn.Name)
+       }
+
+       username = serverConf.Authority.Users[0].User
+       password = serverConf.Authority.Users[0].Password
+
+       // get service of compute node
+       svc, err := r.Service.GetByNamespacedName(ctx, types.NamespacedName{
+               Name:      node.Annotations[AnnotationKeyComputeNodeName],
+               Namespace: node.Annotations[AnnotationKeyComputeNodeNamespace],
+       })
+
+       if err != nil || svc == nil {
+               return nil, fmt.Errorf("get service failed: %w", err)
+       }
+
+       host = fmt.Sprintf("%s.%s", svc.Name, svc.Namespace)
+
+       port = uint(svc.Spec.Ports[0].Port)
+
+       ssServer, err := shardingsphere.NewServer(driver, host, port, username, 
password)
+       if err != nil {
+               return nil, fmt.Errorf("new shardingsphere server failed: %w", 
err)
+       }
+
+       return ssServer, nil
+}
+
 // SetupWithManager sets up the controller with the Manager
 func (r *StorageNodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
        return ctrl.NewControllerManagedBy(mgr).
                For(&v1alpha1.StorageNode{}).
+               Owns(&v1alpha1.ComputeNode{}).
+               Owns(&corev1.Service{}).
                Complete(r)
 }
diff --git a/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go 
b/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go
new file mode 100644
index 0000000..d2eecb4
--- /dev/null
+++ b/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: shardingsphere.go
+
+// Package mock_shardingsphere is a generated GoMock package.
+package mock_shardingsphere
+
+import (
+       reflect "reflect"
+
+       gomock "github.com/golang/mock/gomock"
+)
+
+// MockIServer is a mock of IServer interface.
+type MockIServer struct {
+       ctrl     *gomock.Controller
+       recorder *MockIServerMockRecorder
+}
+
+// MockIServerMockRecorder is the mock recorder for MockIServer.
+type MockIServerMockRecorder struct {
+       mock *MockIServer
+}
+
+// NewMockIServer creates a new mock instance.
+func NewMockIServer(ctrl *gomock.Controller) *MockIServer {
+       mock := &MockIServer{ctrl: ctrl}
+       mock.recorder = &MockIServerMockRecorder{mock}
+       return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockIServer) EXPECT() *MockIServerMockRecorder {
+       return m.recorder
+}
+
+// Close mocks base method.
+func (m *MockIServer) Close() error {
+       m.ctrl.T.Helper()
+       ret := m.ctrl.Call(m, "Close")
+       ret0, _ := ret[0].(error)
+       return ret0
+}
+
+// Close indicates an expected call of Close.
+func (mr *MockIServerMockRecorder) Close() *gomock.Call {
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", 
reflect.TypeOf((*MockIServer)(nil).Close))
+}
+
+// CreateDatabase mocks base method.
+func (m *MockIServer) CreateDatabase(dbName string) error {
+       m.ctrl.T.Helper()
+       ret := m.ctrl.Call(m, "CreateDatabase", dbName)
+       ret0, _ := ret[0].(error)
+       return ret0
+}
+
+// CreateDatabase indicates an expected call of CreateDatabase.
+func (mr *MockIServerMockRecorder) CreateDatabase(dbName interface{}) 
*gomock.Call {
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDatabase", 
reflect.TypeOf((*MockIServer)(nil).CreateDatabase), dbName)
+}
+
+// RegisterStorageUnit mocks base method.
+func (m *MockIServer) RegisterStorageUnit(dsName, host string, port uint, 
dbName, user, password string) error {
+       m.ctrl.T.Helper()
+       ret := m.ctrl.Call(m, "RegisterStorageUnit", dsName, host, port, 
dbName, user, password)
+       ret0, _ := ret[0].(error)
+       return ret0
+}
+
+// RegisterStorageUnit indicates an expected call of RegisterStorageUnit.
+func (mr *MockIServerMockRecorder) RegisterStorageUnit(dsName, host, port, 
dbName, user, password interface{}) *gomock.Call {
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"RegisterStorageUnit", reflect.TypeOf((*MockIServer)(nil).RegisterStorageUnit), 
dsName, host, port, dbName, user, password)
+}
+
+// UnRegisterStorageUnit mocks base method.
+func (m *MockIServer) UnRegisterStorageUnit(dsName string) error {
+       m.ctrl.T.Helper()
+       ret := m.ctrl.Call(m, "UnRegisterStorageUnit", dsName)
+       ret0, _ := ret[0].(error)
+       return ret0
+}
+
+// UnRegisterStorageUnit indicates an expected call of UnRegisterStorageUnit.
+func (mr *MockIServerMockRecorder) UnRegisterStorageUnit(dsName interface{}) 
*gomock.Call {
+       mr.mock.ctrl.T.Helper()
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UnRegisterStorageUnit", 
reflect.TypeOf((*MockIServer)(nil).UnRegisterStorageUnit), dsName)
+}
diff --git a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go 
b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
new file mode 100644
index 0000000..746689a
--- /dev/null
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package shardingsphere
+
+import (
+       "database/sql"
+       "fmt"
+
+       _ "github.com/go-sql-driver/mysql"
+)
+
+const (
+       // DistSQLCreateDatabase create database if not exists.
+       DistSQLCreateDatabase = `CREATE DATABASE IF NOT EXISTS %s;`
+       // DistSQLRegisterStorageUnit register database to shardingsphere by 
storage unit name and database info.
+       DistSQLRegisterStorageUnit = `REGISTER STORAGE UNIT IF NOT EXISTS %s 
(HOST="%s",PORT=%d,DB="%s",USER="%s",PASSWORD="%s");`
+       // DistSQLShowRulesUsed show all rules used by storage unit name.
+       DistSQLShowRulesUsed = `SHOW RULES USED STORAGE UNIT %s FROM %s;`
+       // DistSQLUnRegisterStorageUnit unregister database from shardingsphere 
by storage unit name.
+       DistSQLUnRegisterStorageUnit = `UNREGISTER STORAGE UNIT %s;`
+       // DistSQLDropRule drop rule by rule type and rule name.
+       DistSQLDropRule = `DROP %s RULE %s;`
+       // DistSQLDropTable drop table by table name.
+       DistSQLDropTable = `DROP TABLE %s;`
+)
+
+var (
+       ruleTypeMap = map[string]string{}
+)
+
+type Rule struct {
+       Type string
+       Name string
+}
+
+type server struct {
+       db *sql.DB
+}
+
+type IServer interface {
+       CreateDatabase(dbName string) error
+       RegisterStorageUnit(dsName, host string, port uint, dbName, user, 
password string) error
+       UnRegisterStorageUnit(dsName string) error
+       Close() error
+}
+
+var _ IServer = (*server)(nil)
+
+func NewServer(driver, host string, port uint, user, password string) 
(IServer, error) {
+       if driver != "mysql" && driver != "postgres" {
+               return nil, fmt.Errorf("unsupported database driver: %s", 
driver)
+       }
+
+       if host == "" || port == 0 || user == "" || password == "" {
+               return nil, fmt.Errorf("invalid database config, host=%s, 
port=%d, user=%s, password=%s", host, port, user, password)
+       }
+
+       dataSourceName := fmt.Sprintf("%s:%s@tcp(%s:%d)/", user, password, 
host, port)
+
+       db, err := sql.Open(driver, dataSourceName)
+       if err != nil {
+               return nil, fmt.Errorf("open database=%s error: %w", 
dataSourceName, err)
+       }
+
+       // check database connection
+       if err = db.Ping(); err != nil {
+               return nil, fmt.Errorf("ping database=%s error: %w", 
dataSourceName, err)
+       }
+
+       return &server{db: db}, nil
+}
+
+func (s *server) Close() error {
+       return s.db.Close()
+}
+
+func (s *server) CreateDatabase(dbName string) error {
+       distSQL := fmt.Sprintf(DistSQLCreateDatabase, dbName)
+
+       _, err := s.db.Exec(distSQL)
+       if err != nil {
+               return fmt.Errorf("create database error: %w", err)
+       }
+
+       return nil
+}
+
+func (s *server) RegisterStorageUnit(dsName, host string, port uint, dbName, 
user, password string) error {
+       distSQL := fmt.Sprintf(DistSQLRegisterStorageUnit, dsName, host, port, 
dbName, user, password)
+
+       _, err := s.db.Exec(distSQL)
+       if err != nil {
+               return fmt.Errorf("register database error: %w", err)
+       }
+
+       return nil
+}
+
+// getRulesUsed returns all rules used by storage unit name.
+func (s *server) getRulesUsed(dsName, dbName string) (rules []*Rule, err 
error) {
+       rules = make([]*Rule, 0)
+       distSQL := fmt.Sprintf(DistSQLShowRulesUsed, dsName, dbName)
+
+       rows, err := s.db.Query(distSQL)
+       if err != nil {
+               return nil, fmt.Errorf("get rules used error: %w", err)
+       }
+       defer rows.Close()
+
+       for rows.Next() {
+               var ruleT, ruleN string
+               if err := rows.Scan(&ruleT, &ruleN); err != nil {
+                       return nil, fmt.Errorf("scan rules used error: %w", err)
+               }
+               rules = append(rules, &Rule{Type: ruleT, Name: ruleN})
+       }
+
+       if err := rows.Err(); err != nil {
+               return nil, fmt.Errorf("rows error: %w", err)
+       }
+       return rules, nil
+}
+
+func (s *server) UnRegisterStorageUnit(dsName string) error {
+       rules, err := s.getRulesUsed(dsName, "")
+       if err != nil {
+               return fmt.Errorf("get rules used error: %w", err)
+       }
+
+       // TODO DISCUSS: should we drop all tables used by storage unit?
+       // clean all rules and tables used by storage unit
+       tables := map[string]struct{}{}
+       for _, rule := range rules {
+               if err := s.dropRule(rule.Type, rule.Name); err != nil {
+                       return fmt.Errorf("drop rule error: %w", err)
+               }
+               if _, ok := tables[rule.Name]; !ok {
+                       tables[rule.Name] = struct{}{}
+               }
+       }
+
+       for table := range tables {
+               if err := s.dropTable(table); err != nil {
+                       return fmt.Errorf("drop table error: %w", err)
+               }
+       }
+
+       distSQL := fmt.Sprintf(DistSQLUnRegisterStorageUnit, dsName)
+
+       _, err = s.db.Exec(distSQL)
+       if err != nil {
+               return fmt.Errorf("unregister database error: %w", err)
+       }
+
+       return nil
+}
+
+func (s *server) dropRule(ruleType, ruleName string) error {
+       // convert rule type
+       ruleType = ruleTypeMap[ruleType]
+       distSQL := fmt.Sprintf(DistSQLDropRule, ruleType, ruleName)
+       _, err := s.db.Exec(distSQL)
+       if err != nil {
+               return fmt.Errorf("drop rule fail, err: %s", err)
+       }
+       return nil
+}
+
+func (s *server) dropTable(tableName string) error {
+       distSQL := fmt.Sprintf(DistSQLDropTable, tableName)
+       _, err := s.db.Exec(distSQL)
+       if err != nil {
+               return fmt.Errorf("drop table fail, err: %s", err)
+       }
+       return nil
+}
+
+func init() {
+       // init rule type map
+       // implement more rule type if needed
+       ruleTypeMap = map[string]string{
+               "sharding": "SHARDING TABLE",
+       }
+}
diff --git 
a/shardingsphere-operator/pkg/shardingsphere/shardingsphere_suite_test.go 
b/shardingsphere-operator/pkg/shardingsphere/shardingsphere_suite_test.go
new file mode 100644
index 0000000..1926834
--- /dev/null
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere_suite_test.go
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package shardingsphere_test
+
+import (
+       "testing"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+)
+
+func TestShardingSphere(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "ShardingSphere Suite")
+}
diff --git a/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go 
b/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go
new file mode 100644
index 0000000..15aa828
--- /dev/null
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package shardingsphere
+
+import (
+       "database/sql"
+       "fmt"
+       "regexp"
+
+       "bou.ke/monkey"
+       "github.com/DATA-DOG/go-sqlmock"
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+)
+
+var _ = Describe("Test ShardingSphere Server", func() {
+       var (
+               db     *sql.DB
+               dbmock sqlmock.Sqlmock
+               err    error
+               s      IServer
+       )
+       BeforeEach(func() {
+               db, dbmock, err = sqlmock.New()
+               Expect(err).ShouldNot(HaveOccurred())
+               Expect(dbmock).ShouldNot(BeNil())
+
+               monkey.Patch(sql.Open, func(driverName, dataSourceName string) 
(*sql.DB, error) {
+                       return db, nil
+               })
+
+               s, err = NewServer("mysql", "localhost", uint(3307), "user", 
"password")
+               Expect(err).ShouldNot(HaveOccurred())
+       })
+
+       AfterEach(func() {
+               monkey.Unpatch(sql.Open)
+               db.Close()
+       })
+
+       Context("Test Create database", func() {
+               It("should create success", func() {
+                       dbmock.ExpectExec(regexp.QuoteMeta("CREATE 
DATABASE")).WillReturnResult(sqlmock.NewResult(1, 1))
+
+                       err = s.CreateDatabase("test_db")
+                       Expect(err).ShouldNot(HaveOccurred())
+               })
+       })
+
+       // test register storage unit
+       Context("Test register storage unit", func() {
+               It("should register success", func() {
+                       // mock db and return register storage unit success
+                       dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE 
UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
+
+                       // create server
+                       err = s.RegisterStorageUnit("ds_0", "localhost", 
uint(3307), "sharding_db", "user", "password")
+                       Expect(err).ShouldNot(HaveOccurred())
+               })
+       })
+
+       // test get rules used by storage units
+       Context("Test get rules used by storage units", func() {
+               // should return a sharding table rule named 't_order'.
+               It("should return a sharding table rule named 't_order'", 
func() {
+                       // mock db and return sharding table rule
+                       dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED 
STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type", 
"name"}).AddRow("sharding", "t_order"))
+
+                       result, err := s.(*server).getRulesUsed("ds_0", 
"sharding_db")
+                       Expect(err).ShouldNot(HaveOccurred())
+                       Expect(result).Should(Equal([]*Rule{{Type: "sharding", 
Name: "t_order"}}))
+               })
+       })
+
+       // test drop rule by rule type 'sharding' and rule name 't_order'
+       Context("Test drop rule by rule type 'sharding' and rule name 
't_order'", func() {
+               It("should drop success", func() {
+                       // mock db and return drop rule success
+                       dbmock.ExpectExec("DROP SHARDING TABLE 
RULE").WillReturnResult(sqlmock.NewResult(1, 1))
+
+                       err := s.(*server).dropRule("sharding", "t_order")
+                       Expect(err).ShouldNot(HaveOccurred())
+               })
+       })
+
+       Context("Test unregister storage node", func() {
+               It("should unregister success", func() {
+                       dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED 
STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type", "name"}))
+                       dbmock.ExpectExec(regexp.QuoteMeta("UNREGISTER STORAGE 
UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
+
+                       err = s.UnRegisterStorageUnit("ds_0")
+                       Expect(err).ShouldNot(HaveOccurred())
+               })
+       })
+})
+
+var _ = Describe("Test ShardingSphere Server Manually", func() {
+       var (
+               driver string
+               host   string
+               port   uint
+               user   string
+               pass   string
+       )
+
+       Context("Test create database", func() {
+               It("should create success", func() {
+                       if driver == "" || host == "" || port == 0 || user == 
"" || pass == "" {
+                               Skip("skip test")
+                       }
+                       dbName := "test_db"
+                       s, err := NewServer(driver, host, port, user, pass)
+                       Expect(err).ShouldNot(HaveOccurred())
+                       err = s.CreateDatabase(dbName)
+                       Expect(err).ShouldNot(HaveOccurred())
+
+                       s.(*server).db.Exec(fmt.Sprintf(`DROP DATABASE %s`, 
dbName))
+               })
+       })
+})
diff --git a/shardingsphere-operator/test/e2e/storage_node_controller_test.go 
b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
index 0d10692..f3862ce 100644
--- a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
+++ b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
@@ -134,6 +134,12 @@ var _ = Describe("StorageNode Controller Suite Test", 
func() {
                        Expect(k8sClient.Get(ctx, client.ObjectKey{Name: 
nodeName, Namespace: "default"}, getNode)).Should(Succeed())
 
                        // delete storage node
+                       Expect(k8sClient.Delete(ctx, getNode)).Should(Succeed())
+                       Eventually(func() bool {
+                               newSN := &v1alpha1.StorageNode{}
+                               err := k8sClient.Get(ctx, 
client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)
+                               return err != nil
+                       }, 10*time.Second, 1*time.Second).Should(BeTrue())
                })
        })
 })


Reply via email to