This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 212627b feat(storage-node): support register storage node to
shardingsphere
new 2a9bbb5 Merge pull request #357 from Xu-Wentao/storage-node-register
212627b is described below
commit 212627b1418a9a31ff6fe96b12496ab136df4cfe
Author: xuwentao <[email protected]>
AuthorDate: Wed May 10 17:31:50 2023 +0800
feat(storage-node): support register storage node to shardingsphere
---
.../cmd/shardingsphere-operator/manager/option.go | 1 +
shardingsphere-operator/go.mod | 4 +-
shardingsphere-operator/go.sum | 2 -
.../controllers/storage_ndoe_controller_test.go | 118 +++++++++++-
.../pkg/controllers/storage_node_controller.go | 138 ++++++++++++++
.../pkg/shardingsphere/mocks/shardingsphere.go | 107 +++++++++++
.../pkg/shardingsphere/shardingsphere.go | 199 +++++++++++++++++++++
.../shardingsphere/shardingsphere_suite_test.go | 30 ++++
.../pkg/shardingsphere/shardingsphere_test.go | 135 ++++++++++++++
.../test/e2e/storage_node_controller_test.go | 6 +
10 files changed, 735 insertions(+), 5 deletions(-)
diff --git
a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
index ded3c83..c922c3b 100644
--- a/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
+++ b/shardingsphere-operator/cmd/shardingsphere-operator/manager/option.go
@@ -151,6 +151,7 @@ var featureGatesHandlers = map[string]FeatureGateHandler{
Scheme: mgr.GetScheme(),
Log: mgr.GetLogger(),
Recorder:
mgr.GetEventRecorderFor(controllers.StorageNodeControllerName),
+ Service: service.NewServiceClient(mgr.GetClient()),
}
// init aws client if aws credentials are provided
diff --git a/shardingsphere-operator/go.mod b/shardingsphere-operator/go.mod
index 621957a..4babe65 100644
--- a/shardingsphere-operator/go.mod
+++ b/shardingsphere-operator/go.mod
@@ -4,10 +4,12 @@ go 1.19
require (
bou.ke/monkey v1.0.2
+ github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/antlr/antlr4 v0.0.0-20181218183524-be58ebffde8e
github.com/chaos-mesh/chaos-mesh/api v0.0.0-20230410023700-25a841a23cd2
github.com/database-mesh/golang-sdk v0.0.0-20230420101548-53265cd9883a
github.com/go-logr/logr v1.2.4
+ github.com/go-sql-driver/mysql v1.7.1
github.com/golang/mock v1.6.0
github.com/onsi/ginkgo/v2 v2.8.0
github.com/onsi/gomega v1.26.0
@@ -25,7 +27,6 @@ require (
require github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 //
indirect
require (
- github.com/DATA-DOG/go-sqlmock v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.17.5 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.4 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.4 // indirect
@@ -51,7 +52,6 @@ require (
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
- github.com/go-sql-driver/mysql v1.7.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da //
indirect
github.com/golang/protobuf v1.5.3 // indirect
diff --git a/shardingsphere-operator/go.sum b/shardingsphere-operator/go.sum
index 863342f..7f235c4 100644
--- a/shardingsphere-operator/go.sum
+++ b/shardingsphere-operator/go.sum
@@ -95,8 +95,6 @@ github.com/go-openapi/jsonreference v0.20.2
h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2Kv
github.com/go-openapi/jsonreference v0.20.2/go.mod
h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k=
github.com/go-openapi/swag v0.22.3
h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g=
github.com/go-openapi/swag v0.22.3/go.mod
h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
-github.com/go-sql-driver/mysql v1.7.0
h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
-github.com/go-sql-driver/mysql v1.7.0/go.mod
h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-sql-driver/mysql v1.7.1
h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod
h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
diff --git
a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index 3015dde..8bca029 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -22,9 +22,11 @@ import (
"time"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/service"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/storagenode/aws"
mock_aws
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/storagenode/aws/mocks"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/shardingsphere"
+ mock_shardingsphere
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/shardingsphere/mocks"
"bou.ke/monkey"
dbmesh_aws "github.com/database-mesh/golang-sdk/aws"
@@ -33,6 +35,8 @@ import (
"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
+ corev1 "k8s.io/api/core/v1"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
@@ -56,6 +60,7 @@ var (
reconciler *StorageNodeReconciler
mockCtrl *gomock.Controller
mockAws *mock_aws.MockIRdsClient
+ mockSS *mock_shardingsphere.MockIServer
)
func fakeStorageNodeReconciler() {
@@ -64,6 +69,7 @@ func fakeStorageNodeReconciler() {
scheme := runtime.NewScheme()
Expect(dbmeshv1alpha1.AddToScheme(scheme)).To(Succeed())
Expect(v1alpha1.AddToScheme(scheme)).To(Succeed())
+ Expect(corev1.AddToScheme(scheme)).To(Succeed())
fakeClient = fake.NewClientBuilder().WithScheme(scheme).Build()
sess := dbmesh_aws.NewSessions().SetCredential("AwsRegion",
"AwsAccessKeyID", "AwsSecretAccessKey").Build()
@@ -72,6 +78,7 @@ func fakeStorageNodeReconciler() {
Log: logf.Log,
Recorder: record.NewFakeRecorder(100),
AwsRDS: dbmesh_rds.NewService(sess["AwsRegion"]),
+ Service: service.NewServiceClient(fakeClient),
}
}
@@ -380,4 +387,113 @@ var _ = Describe("StorageNode Controller Mock Test",
func() {
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})
})
+
+ Context("Test register storage node", func() {
+ BeforeEach(func() {
+ mockCtrl = gomock.NewController(GinkgoT())
+ mockSS = mock_shardingsphere.NewMockIServer(mockCtrl)
+ monkey.Patch(shardingsphere.NewServer, func(_, _
string, _ uint, _, _ string) (shardingsphere.IServer, error) {
+ return mockSS, nil
+ })
+ })
+ AfterEach(func() {
+ mockCtrl.Finish()
+ })
+ It("should be successful when storage node is not registered",
func() {
+ nodeName := "test register storage node"
+ cnName := "test-compute-node"
+ req := ctrl.Request{
+ NamespacedName: client.ObjectKey{
+ Name: nodeName,
+ Namespace: defaultTestNamespace,
+ },
+ }
+ storageNode := &v1alpha1.StorageNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: nodeName,
+ Namespace: defaultTestNamespace,
+ Annotations: map[string]string{
+
AnnotationKeyRegisterStorageUnitEnabled: "true",
+ AnnotationKeyDatabaseName:
"sharding_db",
+
AnnotationKeyComputeNodeNamespace: defaultTestNamespace,
+ AnnotationKeyComputeNodeName:
cnName,
+ },
+ },
+ Spec: v1alpha1.StorageNodeSpec{
+ DatabaseClassName: defaultTestDBClass,
+ },
+ }
+ ins := &dbmesh_rds.DescInstance{
+ DBInstanceIdentifier:
"ins-test-register-storage-node",
+ DBInstanceStatus:
v1alpha1.StorageNodeInstanceStatusAvailable,
+ Endpoint: dbmesh_rds.Endpoint{
+ Address: "127.0.0.1",
+ Port: 3306,
+ },
+ }
+ cn := &v1alpha1.ComputeNode{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: cnName,
+ Namespace: defaultTestNamespace,
+ },
+ Spec: v1alpha1.ComputeNodeSpec{
+ Bootstrap: v1alpha1.BootstrapConfig{
+ ServerConfig:
v1alpha1.ServerConfig{
+ Authority:
v1alpha1.ComputeNodeAuthority{
+ Users:
[]v1alpha1.ComputeNodeUser{
+ {
+
User: "root@%",
+
Password: "root",
+ },
+ },
+ Privilege:
v1alpha1.ComputeNodePrivilege{
+ Type:
v1alpha1.AllPermitted,
+ },
+ },
+ Props:
map[string]string{
+
ShardingSphereProtocolType: "MySQL",
+ },
+ },
+ },
+ },
+ }
+
+ svc := &corev1.Service{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: cnName,
+ Namespace: defaultTestNamespace,
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {
+ Name:
"shardingsphere-proxy",
+ Port: 3306,
+ },
+ },
+ ClusterIP: "127.0.0.1",
+ Type: corev1.ServiceTypeClusterIP,
+ },
+ }
+
+ Expect(fakeClient.Create(ctx,
storageNode)).Should(Succeed())
+ Expect(fakeClient.Create(ctx, cn)).Should(Succeed())
+ Expect(fakeClient.Create(ctx, svc)).Should(Succeed())
+
+ // mock aws rds client, get available instance
+ mockAws.EXPECT().GetInstance(gomock.Any(),
gomock.Any()).Return(ins, nil)
+ // mock shardingsphere create database
+ mockSS.EXPECT().CreateDatabase(gomock.Any()).Return(nil)
+ // mock shardingsphere register storage unit
+ mockSS.EXPECT().RegisterStorageUnit(gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).Return(nil)
+ // mock shardingsphere close connection
+ mockSS.EXPECT().Close()
+
+ _, err := reconciler.Reconcile(ctx, req)
+ Expect(err).To(BeNil())
+
+ registeredSN := &v1alpha1.StorageNode{}
+ Expect(fakeClient.Get(ctx, client.ObjectKey{Name:
defaultTestStorageNode, Namespace: defaultTestNamespace},
registeredSN)).Should(Succeed())
+ // TODO update storage node register status
+ })
+ })
})
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index b2efb9e..dd6acb9 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -21,9 +21,12 @@ import (
"context"
"fmt"
"reflect"
+ "strings"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/kubernetes/service"
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/storagenode/aws"
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/shardingsphere"
"github.com/database-mesh/golang-sdk/aws/client/rds"
dbmeshv1alpha1
"github.com/database-mesh/golang-sdk/kubernetes/api/v1alpha1"
@@ -31,6 +34,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/strings/slices"
ctrl "sigs.k8s.io/controller-runtime"
@@ -40,6 +44,13 @@ import (
const (
StorageNodeControllerName = "storage-node-controller"
FinalizerName = "shardingsphere.apache.org/finalizer"
+
+ AnnotationKeyRegisterStorageUnitEnabled =
"shardingsphere.apache.org/register-storage-unit-enabled"
+ AnnotationKeyComputeNodeNamespace =
"shardingsphere.apache.org/compute-node-namespace"
+ AnnotationKeyComputeNodeName =
"shardingsphere.apache.org/compute-node-name"
+ AnnotationKeyDatabaseName =
"shardingsphere.apache.org/database-name"
+
+ ShardingSphereProtocolType = "proxy-frontend-database-protocol-type"
)
// StorageNodeReconciler is a controller for storage nodes
@@ -49,6 +60,8 @@ type StorageNodeReconciler struct {
Log logr.Logger
Recorder record.EventRecorder
AwsRDS rds.RDS
+
+ Service service.Service
}
// Reconcile handles main function of this controller
@@ -156,10 +169,21 @@ func (r *StorageNodeReconciler) reconcile(ctx
context.Context, dbClass *dbmeshv1
}
}
+ // register storage unit if needed.
+ if err := r.registerStorageUnit(ctx, node); err != nil {
+ r.Log.Error(err, fmt.Sprintf("unable to register storage unit
[%s:%s]", node.GetNamespace(), node.GetName()))
+ return ctrl.Result{Requeue: true}, err
+ }
+
return ctrl.Result{RequeueAfter: defaultRequeueTime}, nil
}
func (r *StorageNodeReconciler) getDatabaseClass(ctx context.Context, node
*v1alpha1.StorageNode) (databaseClass *dbmeshv1alpha1.DatabaseClass, err error)
{
+ if node.Spec.DatabaseClassName == "" {
+ r.Recorder.Event(node, corev1.EventTypeWarning,
"DatabaseClassNameIsNil", "DatabaseClassName is nil")
+ return nil, fmt.Errorf("DatabaseClassName is nil")
+ }
+
databaseClass = &dbmeshv1alpha1.DatabaseClass{}
if err := r.Get(ctx, client.ObjectKey{Name:
node.Spec.DatabaseClassName}, databaseClass); err != nil {
@@ -422,9 +446,123 @@ func (r *StorageNodeReconciler) deleteAWSRDSInstance(ctx
context.Context, client
return nil
}
+// registerStorageUnit
+func (r *StorageNodeReconciler) registerStorageUnit(ctx context.Context, node
*v1alpha1.StorageNode) error {
+ // if register storage unit is not enabled, return
+ if node.Annotations[AnnotationKeyRegisterStorageUnitEnabled] != "true" {
+ return nil
+ }
+ // if node is not ready, return
+ if node.Status.Phase != v1alpha1.StorageNodePhaseReady {
+ return nil
+ }
+
+ if err := validateComputeNodeAnnotations(node); err != nil {
+ return err
+ }
+
+ dbName := node.Annotations[AnnotationKeyDatabaseName]
+
+ ssServer, err := r.getShardingsphereServer(ctx, node)
+ if err != nil {
+ return fmt.Errorf("getShardingsphereServer failed: %w", err)
+ }
+
+ defer ssServer.Close()
+
+ if err := ssServer.CreateDatabase(dbName); err != nil {
+ return fmt.Errorf("create database failed: %w", err)
+ }
+
+ // TODO add cluster
+
+ ins := node.Status.Instances[0]
+ host := ins.Endpoint.Address
+ port := ins.Endpoint.Port
+ username := node.Annotations[dbmeshv1alpha1.AnnotationsMasterUsername]
+ password :=
node.Annotations[dbmeshv1alpha1.AnnotationsMasterUserPassword]
+
+ // TODO how to set ds name?
+ if err := ssServer.RegisterStorageUnit("ds_0", host, uint(port),
dbName, username, password); err != nil {
+ return fmt.Errorf("register storage node failed: %w", err)
+ }
+
+ return nil
+}
+
+func validateComputeNodeAnnotations(node *v1alpha1.StorageNode) error {
+ requiredAnnos := []string{
+ AnnotationKeyDatabaseName,
+ AnnotationKeyComputeNodeNamespace,
+ AnnotationKeyComputeNodeName,
+ }
+
+ for _, anno := range requiredAnnos {
+ if v, ok := node.Annotations[anno]; !ok || v == "" {
+ return fmt.Errorf("annotation [%s] is required", anno)
+ }
+ }
+
+ return nil
+}
+
+func (r *StorageNodeReconciler) getShardingsphereServer(ctx context.Context,
node *v1alpha1.StorageNode) (shardingsphere.IServer, error) {
+ var (
+ driver, host, username, password string
+ port uint
+ )
+
+ // get compute node
+ cn := &v1alpha1.ComputeNode{}
+ if err := r.Client.Get(ctx, types.NamespacedName{
+ Name: node.Annotations[AnnotationKeyComputeNodeName],
+ Namespace: node.Annotations[AnnotationKeyComputeNodeNamespace],
+ }, cn); err != nil {
+ return nil, fmt.Errorf("get compute node failed: %w", err)
+ }
+
+ serverConf := cn.Spec.Bootstrap.ServerConfig
+
+ driver, ok := serverConf.Props[ShardingSphereProtocolType]
+ if !ok || driver == "" {
+ driver = "mysql"
+ }
+ driver = strings.ToLower(driver)
+
+ if len(serverConf.Authority.Users) == 0 {
+ return nil, fmt.Errorf("no user in compute node [%s]",
cn.Namespace+"/"+cn.Name)
+ }
+
+ username = serverConf.Authority.Users[0].User
+ password = serverConf.Authority.Users[0].Password
+
+ // get service of compute node
+ svc, err := r.Service.GetByNamespacedName(ctx, types.NamespacedName{
+ Name: node.Annotations[AnnotationKeyComputeNodeName],
+ Namespace: node.Annotations[AnnotationKeyComputeNodeNamespace],
+ })
+
+ if err != nil || svc == nil {
+ return nil, fmt.Errorf("get service failed: %w", err)
+ }
+
+ host = fmt.Sprintf("%s.%s", svc.Name, svc.Namespace)
+
+ port = uint(svc.Spec.Ports[0].Port)
+
+ ssServer, err := shardingsphere.NewServer(driver, host, port, username,
password)
+ if err != nil {
+ return nil, fmt.Errorf("new shardingsphere server failed: %w",
err)
+ }
+
+ return ssServer, nil
+}
+
// SetupWithManager sets up the controller with the Manager
func (r *StorageNodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.StorageNode{}).
+ Owns(&v1alpha1.ComputeNode{}).
+ Owns(&corev1.Service{}).
Complete(r)
}
diff --git a/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go
b/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go
new file mode 100644
index 0000000..d2eecb4
--- /dev/null
+++ b/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: shardingsphere.go
+
+// Package mock_shardingsphere is a generated GoMock package.
+package mock_shardingsphere
+
+import (
+ reflect "reflect"
+
+ gomock "github.com/golang/mock/gomock"
+)
+
+// MockIServer is a mock of IServer interface.
+type MockIServer struct {
+ ctrl *gomock.Controller
+ recorder *MockIServerMockRecorder
+}
+
+// MockIServerMockRecorder is the mock recorder for MockIServer.
+type MockIServerMockRecorder struct {
+ mock *MockIServer
+}
+
+// NewMockIServer creates a new mock instance.
+func NewMockIServer(ctrl *gomock.Controller) *MockIServer {
+ mock := &MockIServer{ctrl: ctrl}
+ mock.recorder = &MockIServerMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockIServer) EXPECT() *MockIServerMockRecorder {
+ return m.recorder
+}
+
+// Close mocks base method.
+func (m *MockIServer) Close() error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "Close")
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// Close indicates an expected call of Close.
+func (mr *MockIServerMockRecorder) Close() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close",
reflect.TypeOf((*MockIServer)(nil).Close))
+}
+
+// CreateDatabase mocks base method.
+func (m *MockIServer) CreateDatabase(dbName string) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "CreateDatabase", dbName)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// CreateDatabase indicates an expected call of CreateDatabase.
+func (mr *MockIServerMockRecorder) CreateDatabase(dbName interface{})
*gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDatabase",
reflect.TypeOf((*MockIServer)(nil).CreateDatabase), dbName)
+}
+
+// RegisterStorageUnit mocks base method.
+func (m *MockIServer) RegisterStorageUnit(dsName, host string, port uint,
dbName, user, password string) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "RegisterStorageUnit", dsName, host, port,
dbName, user, password)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// RegisterStorageUnit indicates an expected call of RegisterStorageUnit.
+func (mr *MockIServerMockRecorder) RegisterStorageUnit(dsName, host, port,
dbName, user, password interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"RegisterStorageUnit", reflect.TypeOf((*MockIServer)(nil).RegisterStorageUnit),
dsName, host, port, dbName, user, password)
+}
+
+// UnRegisterStorageUnit mocks base method.
+func (m *MockIServer) UnRegisterStorageUnit(dsName string) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "UnRegisterStorageUnit", dsName)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// UnRegisterStorageUnit indicates an expected call of UnRegisterStorageUnit.
+func (mr *MockIServerMockRecorder) UnRegisterStorageUnit(dsName interface{})
*gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"UnRegisterStorageUnit",
reflect.TypeOf((*MockIServer)(nil).UnRegisterStorageUnit), dsName)
+}
diff --git a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
new file mode 100644
index 0000000..746689a
--- /dev/null
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package shardingsphere
+
+import (
+ "database/sql"
+ "fmt"
+
+ _ "github.com/go-sql-driver/mysql"
+)
+
+const (
+ // DistSQLCreateDatabase create database if not exists.
+ DistSQLCreateDatabase = `CREATE DATABASE IF NOT EXISTS %s;`
+ // DistSQLRegisterStorageUnit register database to shardingsphere by
storage unit name and database info.
+ DistSQLRegisterStorageUnit = `REGISTER STORAGE UNIT IF NOT EXISTS %s
(HOST="%s",PORT=%d,DB="%s",USER="%s",PASSWORD="%s");`
+ // DistSQLShowRulesUsed show all rules used by storage unit name.
+ DistSQLShowRulesUsed = `SHOW RULES USED STORAGE UNIT %s FROM %s;`
+ // DistSQLUnRegisterStorageUnit unregister database from shardingsphere
by storage unit name.
+ DistSQLUnRegisterStorageUnit = `UNREGISTER STORAGE UNIT %s;`
+ // DistSQLDropRule drop rule by rule type and rule name.
+ DistSQLDropRule = `DROP %s RULE %s;`
+ // DistSQLDropTable drop table by table name.
+ DistSQLDropTable = `DROP TABLE %s;`
+)
+
+var (
+ ruleTypeMap = map[string]string{}
+)
+
+type Rule struct {
+ Type string
+ Name string
+}
+
+type server struct {
+ db *sql.DB
+}
+
+type IServer interface {
+ CreateDatabase(dbName string) error
+ RegisterStorageUnit(dsName, host string, port uint, dbName, user,
password string) error
+ UnRegisterStorageUnit(dsName string) error
+ Close() error
+}
+
+var _ IServer = (*server)(nil)
+
+func NewServer(driver, host string, port uint, user, password string)
(IServer, error) {
+ if driver != "mysql" && driver != "postgres" {
+ return nil, fmt.Errorf("unsupported database driver: %s",
driver)
+ }
+
+ if host == "" || port == 0 || user == "" || password == "" {
+ return nil, fmt.Errorf("invalid database config, host=%s,
port=%d, user=%s, password=%s", host, port, user, password)
+ }
+
+ dataSourceName := fmt.Sprintf("%s:%s@tcp(%s:%d)/", user, password,
host, port)
+
+ db, err := sql.Open(driver, dataSourceName)
+ if err != nil {
+ return nil, fmt.Errorf("open database=%s error: %w",
dataSourceName, err)
+ }
+
+ // check database connection
+ if err = db.Ping(); err != nil {
+ return nil, fmt.Errorf("ping database=%s error: %w",
dataSourceName, err)
+ }
+
+ return &server{db: db}, nil
+}
+
+func (s *server) Close() error {
+ return s.db.Close()
+}
+
+func (s *server) CreateDatabase(dbName string) error {
+ distSQL := fmt.Sprintf(DistSQLCreateDatabase, dbName)
+
+ _, err := s.db.Exec(distSQL)
+ if err != nil {
+ return fmt.Errorf("create database error: %w", err)
+ }
+
+ return nil
+}
+
+func (s *server) RegisterStorageUnit(dsName, host string, port uint, dbName,
user, password string) error {
+ distSQL := fmt.Sprintf(DistSQLRegisterStorageUnit, dsName, host, port,
dbName, user, password)
+
+ _, err := s.db.Exec(distSQL)
+ if err != nil {
+ return fmt.Errorf("register database error: %w", err)
+ }
+
+ return nil
+}
+
+// getRulesUsed returns all rules used by storage unit name.
+func (s *server) getRulesUsed(dsName, dbName string) (rules []*Rule, err
error) {
+ rules = make([]*Rule, 0)
+ distSQL := fmt.Sprintf(DistSQLShowRulesUsed, dsName, dbName)
+
+ rows, err := s.db.Query(distSQL)
+ if err != nil {
+ return nil, fmt.Errorf("get rules used error: %w", err)
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var ruleT, ruleN string
+ if err := rows.Scan(&ruleT, &ruleN); err != nil {
+ return nil, fmt.Errorf("scan rules used error: %w", err)
+ }
+ rules = append(rules, &Rule{Type: ruleT, Name: ruleN})
+ }
+
+ if err := rows.Err(); err != nil {
+ return nil, fmt.Errorf("rows error: %w", err)
+ }
+ return rules, nil
+}
+
+func (s *server) UnRegisterStorageUnit(dsName string) error {
+ rules, err := s.getRulesUsed(dsName, "")
+ if err != nil {
+ return fmt.Errorf("get rules used error: %w", err)
+ }
+
+ // TODO DISCUSS: should we drop all tables used by storage unit?
+ // clean all rules and tables used by storage unit
+ tables := map[string]struct{}{}
+ for _, rule := range rules {
+ if err := s.dropRule(rule.Type, rule.Name); err != nil {
+ return fmt.Errorf("drop rule error: %w", err)
+ }
+ if _, ok := tables[rule.Name]; !ok {
+ tables[rule.Name] = struct{}{}
+ }
+ }
+
+ for table := range tables {
+ if err := s.dropTable(table); err != nil {
+ return fmt.Errorf("drop table error: %w", err)
+ }
+ }
+
+ distSQL := fmt.Sprintf(DistSQLUnRegisterStorageUnit, dsName)
+
+ _, err = s.db.Exec(distSQL)
+ if err != nil {
+ return fmt.Errorf("unregister database error: %w", err)
+ }
+
+ return nil
+}
+
+func (s *server) dropRule(ruleType, ruleName string) error {
+ // convert rule type
+ ruleType = ruleTypeMap[ruleType]
+ distSQL := fmt.Sprintf(DistSQLDropRule, ruleType, ruleName)
+ _, err := s.db.Exec(distSQL)
+ if err != nil {
+ return fmt.Errorf("drop rule fail, err: %s", err)
+ }
+ return nil
+}
+
+func (s *server) dropTable(tableName string) error {
+ distSQL := fmt.Sprintf(DistSQLDropTable, tableName)
+ _, err := s.db.Exec(distSQL)
+ if err != nil {
+ return fmt.Errorf("drop table fail, err: %s", err)
+ }
+ return nil
+}
+
+func init() {
+ // init rule type map
+ // implement more rule type if needed
+ ruleTypeMap = map[string]string{
+ "sharding": "SHARDING TABLE",
+ }
+}
diff --git
a/shardingsphere-operator/pkg/shardingsphere/shardingsphere_suite_test.go
b/shardingsphere-operator/pkg/shardingsphere/shardingsphere_suite_test.go
new file mode 100644
index 0000000..1926834
--- /dev/null
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere_suite_test.go
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package shardingsphere_test
+
+import (
+ "testing"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
+
+func TestShardingSphere(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "ShardingSphere Suite")
+}
diff --git a/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go
b/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go
new file mode 100644
index 0000000..15aa828
--- /dev/null
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package shardingsphere
+
+import (
+ "database/sql"
+ "fmt"
+ "regexp"
+
+ "bou.ke/monkey"
+ "github.com/DATA-DOG/go-sqlmock"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
+
+var _ = Describe("Test ShardingSphere Server", func() {
+ var (
+ db *sql.DB
+ dbmock sqlmock.Sqlmock
+ err error
+ s IServer
+ )
+ BeforeEach(func() {
+ db, dbmock, err = sqlmock.New()
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(dbmock).ShouldNot(BeNil())
+
+ monkey.Patch(sql.Open, func(driverName, dataSourceName string)
(*sql.DB, error) {
+ return db, nil
+ })
+
+ s, err = NewServer("mysql", "localhost", uint(3307), "user",
"password")
+ Expect(err).ShouldNot(HaveOccurred())
+ })
+
+ AfterEach(func() {
+ monkey.Unpatch(sql.Open)
+ db.Close()
+ })
+
+ Context("Test Create database", func() {
+ It("should create success", func() {
+ dbmock.ExpectExec(regexp.QuoteMeta("CREATE
DATABASE")).WillReturnResult(sqlmock.NewResult(1, 1))
+
+ err = s.CreateDatabase("test_db")
+ Expect(err).ShouldNot(HaveOccurred())
+ })
+ })
+
+ // test register storage unit
+ Context("Test register storage unit", func() {
+ It("should register success", func() {
+ // mock db and return register storage unit success
+ dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE
UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
+
+ // create server
+ err = s.RegisterStorageUnit("ds_0", "localhost",
uint(3307), "sharding_db", "user", "password")
+ Expect(err).ShouldNot(HaveOccurred())
+ })
+ })
+
+ // test get rules used by storage units
+ Context("Test get rules used by storage units", func() {
+ // should return a sharding table rule named 't_order'.
+ It("should return a sharding table rule named 't_order'",
func() {
+ // mock db and return sharding table rule
+ dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED
STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type",
"name"}).AddRow("sharding", "t_order"))
+
+ result, err := s.(*server).getRulesUsed("ds_0",
"sharding_db")
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(result).Should(Equal([]*Rule{{Type: "sharding",
Name: "t_order"}}))
+ })
+ })
+
+ // test drop rule by rule type 'sharding' and rule name 't_order'
+ Context("Test drop rule by rule type 'sharding' and rule name
't_order'", func() {
+ It("should drop success", func() {
+ // mock db and return drop rule success
+ dbmock.ExpectExec("DROP SHARDING TABLE
RULE").WillReturnResult(sqlmock.NewResult(1, 1))
+
+ err := s.(*server).dropRule("sharding", "t_order")
+ Expect(err).ShouldNot(HaveOccurred())
+ })
+ })
+
+ Context("Test unregister storage node", func() {
+ It("should unregister success", func() {
+ dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED
STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type", "name"}))
+ dbmock.ExpectExec(regexp.QuoteMeta("UNREGISTER STORAGE
UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
+
+ err = s.UnRegisterStorageUnit("ds_0")
+ Expect(err).ShouldNot(HaveOccurred())
+ })
+ })
+})
+
+var _ = Describe("Test ShardingSphere Server Manually", func() {
+ var (
+ driver string
+ host string
+ port uint
+ user string
+ pass string
+ )
+
+ Context("Test create database", func() {
+ It("should create success", func() {
+ if driver == "" || host == "" || port == 0 || user ==
"" || pass == "" {
+ Skip("skip test")
+ }
+ dbName := "test_db"
+ s, err := NewServer(driver, host, port, user, pass)
+ Expect(err).ShouldNot(HaveOccurred())
+ err = s.CreateDatabase(dbName)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ s.(*server).db.Exec(fmt.Sprintf(`DROP DATABASE %s`,
dbName))
+ })
+ })
+})
diff --git a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
index 0d10692..f3862ce 100644
--- a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
+++ b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
@@ -134,6 +134,12 @@ var _ = Describe("StorageNode Controller Suite Test",
func() {
Expect(k8sClient.Get(ctx, client.ObjectKey{Name:
nodeName, Namespace: "default"}, getNode)).Should(Succeed())
// delete storage node
+ Expect(k8sClient.Delete(ctx, getNode)).Should(Succeed())
+ Eventually(func() bool {
+ newSN := &v1alpha1.StorageNode{}
+ err := k8sClient.Get(ctx,
client.ObjectKey{Name: nodeName, Namespace: "default"}, newSN)
+ return err != nil
+ }, 10*time.Second, 1*time.Second).Should(BeTrue())
})
})
})