This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a7b6f8  feat(pitr):wrap ShardingSphere Proxy DistSQL (#262)
9a7b6f8 is described below

commit 9a7b6f827eedbb7f923d728753a24084c5332cc6
Author: lltgo <[email protected]>
AuthorDate: Fri Mar 10 18:25:06 2023 +0800

    feat(pitr):wrap ShardingSphere Proxy DistSQL (#262)
---
 pitr/cli/internal/pkg/local-storage_test.go        |  12 +-
 pitr/cli/internal/pkg/model/ls_backup.go           |  33 ++---
 .../ss_backup.go}                                  |  41 +++---
 pitr/cli/internal/pkg/shardingsphere-proxy.go      | 139 ++++++++++++++++++++-
 pitr/cli/internal/pkg/shardingsphere-proxy_test.go |  55 +++++++-
 5 files changed, 229 insertions(+), 51 deletions(-)

diff --git a/pitr/cli/internal/pkg/local-storage_test.go 
b/pitr/cli/internal/pkg/local-storage_test.go
index 40e1d72..2781d71 100644
--- a/pitr/cli/internal/pkg/local-storage_test.go
+++ b/pitr/cli/internal/pkg/local-storage_test.go
@@ -19,11 +19,13 @@ package pkg
 
 import (
        "fmt"
-       "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
-       "github.com/google/uuid"
        "os"
        "time"
 
+       "github.com/google/uuid"
+
+       "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
+
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
 )
@@ -94,7 +96,7 @@ var _ = Describe("ILocalStorage", func() {
                                        ID:        uuid.New().String(),
                                        CSN:       uuid.New().String(),
                                        StartTime: time.Now().Unix(),
-                                       Endtime:   
time.Now().Add(time.Minute).Unix(),
+                                       EndTime:   
time.Now().Add(time.Minute).Unix(),
                                },
                                DnList: []model.DataNode{
                                        {
@@ -103,7 +105,7 @@ var _ = Describe("ILocalStorage", func() {
                                                Status:    "Completed",
                                                BackupID:  "SK08DAK1",
                                                StartTime: time.Now().Unix(),
-                                               Endtime:   time.Now().Unix(),
+                                               EndTime:   time.Now().Unix(),
                                        },
                                        {
                                                IP:        "1.1.1.2",
@@ -111,7 +113,7 @@ var _ = Describe("ILocalStorage", func() {
                                                Status:    "Completed",
                                                BackupID:  "SK08DAK2",
                                                StartTime: time.Now().Unix(),
-                                               Endtime:   time.Now().Unix(),
+                                               EndTime:   time.Now().Unix(),
                                        },
                                },
                                SsBackup: &model.SsBackup{
diff --git a/pitr/cli/internal/pkg/model/ls_backup.go 
b/pitr/cli/internal/pkg/model/ls_backup.go
index 547d688..48514bf 100644
--- a/pitr/cli/internal/pkg/model/ls_backup.go
+++ b/pitr/cli/internal/pkg/model/ls_backup.go
@@ -29,7 +29,7 @@ type (
                ID        string `json:"id"`
                CSN       string `json:"csn"`
                StartTime int64  `json:"start_time"` // Unix time
-               Endtime   int64  `json:"end_time"`   // Unix time
+               EndTime   int64  `json:"end_time"`   // Unix time
        }
 
        DataNode struct {
@@ -38,23 +38,18 @@ type (
                Status    string `json:"status"`
                BackupID  string `json:"backup_id"`
                StartTime int64  `json:"start_time"` // Unix time
-               Endtime   int64  `json:"end_time"`   // Unix time
+               EndTime   int64  `json:"end_time"`   // Unix time
        }
 )
 
 type (
        SsBackup struct {
-               Status       string         `json:"status"`
-               ClusterInfo  ClusterInfo    `json:"cluster_info"`
-               StorageNodes []StorageNodes `json:"storage_nodes"`
+               Status       string        `json:"status"`
+               ClusterInfo  ClusterInfo   `json:"cluster_info"`
+               StorageNodes []StorageNode `json:"storage_nodes"`
        }
 
-       ClusterInfo struct {
-               MetaData     MetaData     `json:"meta_data"`
-               SnapshotInfo SnapshotInfo `json:"snapshot_info"`
-       }
-
-       StorageNodes struct {
+       StorageNode struct {
                IP       string `json:"ip"`
                Port     string `json:"port"`
                Username string `json:"username"`
@@ -63,19 +58,7 @@ type (
                Remark   string `json:"remark"`
        }
 
-       MetaData struct {
-               Databases Databases `json:"databases"`
-               Props     string    `json:"props"`
-               Rules     string    `json:"rules"`
-       }
-
-       Databases struct {
-               ShardingDb string `json:"sharding_db"`
-               AnotherDb  string `json:"another_db"`
-       }
-
-       SnapshotInfo struct {
-               Csn        string `json:"csn"`
-               CreateTime string `json:"create_time"`
+       StorageNodes struct {
+               List []StorageNode `json:"storage_nodes"`
        }
 )
diff --git a/pitr/cli/internal/pkg/shardingsphere-proxy_test.go 
b/pitr/cli/internal/pkg/model/ss_backup.go
similarity index 58%
copy from pitr/cli/internal/pkg/shardingsphere-proxy_test.go
copy to pitr/cli/internal/pkg/model/ss_backup.go
index 743a270..b58ad65 100644
--- a/pitr/cli/internal/pkg/shardingsphere-proxy_test.go
+++ b/pitr/cli/internal/pkg/model/ss_backup.go
@@ -15,20 +15,31 @@
 * limitations under the License.
  */
 
-package pkg
+package model
 
-import (
-       . "github.com/onsi/ginkgo/v2"
-       . "github.com/onsi/gomega"
-)
+type (
+       SsBackupInfo struct {
+               ClusterInfo ClusterInfo `json:"cluster_info"`
+       }
+
+       ClusterInfo struct {
+               MetaData     MetaData     `json:"meta_data"`
+               SnapshotInfo SnapshotInfo `json:"snapshot_info"`
+       }
+
+       MetaData struct {
+               Databases Databases `json:"databases"`
+               Props     string    `json:"props"`
+               Rules     string    `json:"rules"`
+       }
 
-var _ = Describe("IShardingSphere", func() {
-       Context("NewShardingSphereProxy", func() {
-               It("Connecting shardingsphere proxy", func() {
-                       Skip("Manually exec:dependent environment")
-                       ss, err := NewShardingSphereProxy("root", "root", 
DefaultDbName, "127.0.0.1", 13308)
-                       Expect(err).To(BeNil())
-                       Expect(ss).NotTo(BeNil())
-               })
-       })
-})
+       Databases struct {
+               ShardingDb string `json:"sharding_db"`
+               AnotherDb  string `json:"another_db"`
+       }
+
+       SnapshotInfo struct {
+               Csn        string `json:"csn"`
+               CreateTime string `json:"create_time"`
+       }
+)
diff --git a/pitr/cli/internal/pkg/shardingsphere-proxy.go 
b/pitr/cli/internal/pkg/shardingsphere-proxy.go
index 215294e..bd0be41 100644
--- a/pitr/cli/internal/pkg/shardingsphere-proxy.go
+++ b/pitr/cli/internal/pkg/shardingsphere-proxy.go
@@ -19,23 +19,34 @@ package pkg
 
 import (
        "database/sql"
+       "encoding/json"
        "fmt"
+
+       "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
+       "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/xerr"
        "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/gsutil"
 )
 
 type (
-       shardingSphere struct {
+       shardingSphereProxy struct {
                db *sql.DB
        }
 
-       IShardingSphere interface{}
+       IShardingSphereProxy interface {
+               ExportMetaData() (*model.ClusterInfo, error)
+               ExportStorageNodes() ([]model.StorageNode, error)
+               LockForRestore() error
+               LockForBackup() error
+               Unlock() error
+               ImportMetaData(in *model.ClusterInfo) error
+       }
 )
 
 const (
        DefaultDbName = "postgres"
 )
 
-func NewShardingSphereProxy(user, password, dbName, host string, port uint16) 
(IShardingSphere, error) {
+func NewShardingSphereProxy(user, password, dbName, host string, port uint16) 
(IShardingSphereProxy, error) {
        db, err := gsutil.Open(user, password, dbName, host, port)
        if err != nil {
                return nil, err
@@ -44,5 +55,125 @@ func NewShardingSphereProxy(user, password, dbName, host 
string, port uint16) (I
                efmt := "db ping 
fail[host=%s,port=%d,user=%s,pwLen=%d,dbName=%s],err=%s"
                return nil, fmt.Errorf(efmt, host, port, user, len(password), 
dbName, err)
        }
-       return &shardingSphere{db: db}, nil
+       return &shardingSphereProxy{db: db}, nil
+}
+
+/*
+LockForBackup 停写,同时锁 CSN,备份场景使用
+*/
+func (ss *shardingSphereProxy) LockForBackup() error {
+       _, err := ss.db.Exec(`LOCK CLUSTER WITH 
LOCK_STRATEGY(TYPE(NAME="WRITE", PROPERTIES("lock_csn"=true)));`)
+       if err != nil {
+               return xerr.NewCliErr("ss lock for backup failure")
+       }
+       return nil
+}
+
+/*
+LockForRestore 停读写,不需要锁 CSN,恢复场景使用
+*/
+func (ss *shardingSphereProxy) LockForRestore() error {
+       _, err := ss.db.Exec(`LOCK CLUSTER WITH 
LOCK_STRATEGY(TYPE(NAME="READ_WRITE"))`)
+       if err != nil {
+               return xerr.NewCliErr("ss lock for restore failure")
+       }
+       return nil
+}
+
+func (ss *shardingSphereProxy) Unlock() error {
+       _, err := ss.db.Exec("UNLOCK CLUSTER;")
+       if err != nil {
+               return xerr.NewCliErr("ss unlock failure")
+       }
+       return nil
+}
+
+/*
+ExportMetaData 导出 SS 元数据
+
++-----------------------------+-------------------------+----------------------------------------+
+| id                          | create_time             | data                 
                  |
++-------------------------------------------------------+----------------------------------------+
+| 734bb036-b15d-4af0-be87-237 | 2023-01-01 12:00:00 897 | 
{"meta_data":{},"snapshot_info":{}}    |
++-------------------------------------------------------+----------------------------------------+
+*/
+func (ss *shardingSphereProxy) ExportMetaData() (*model.ClusterInfo, error) {
+       query, err := ss.db.Query(`EXPORT METADATA`)
+       if err != nil {
+               return nil, xerr.NewCliErr(fmt.Sprintf("export meta data 
failure,err=%s", err))
+       }
+       var (
+               id         string
+               createTime string
+               data       string
+       )
+       if query.Next() {
+               if err = query.Scan(&id, &createTime, &data); err != nil {
+                       return nil, xerr.NewCliErr(fmt.Sprintf("query scan 
failure,err=%s", err))
+               }
+
+               if err = query.Close(); err != nil {
+                       return nil, xerr.NewCliErr(fmt.Sprintf("query close 
failure,err=%s", err))
+               }
+       }
+
+       out := model.SsBackupInfo{}
+       if err = json.Unmarshal([]byte(data), &out); err != nil {
+               return nil, fmt.Errorf("json unmarshal return err=%s", err)
+       }
+       return nil, nil
+}
+
+/*
+ExportStorageNodes 导出存储节点数据
+
++-----------------------------+-------------------------+----------------------------------------+
+| id                          | create_time             | data                 
                  |
++-------------------------------------------------------+----------------------------------------+
+| 734bb036-b15d-4af0-be87-237 | 2023-01-01 12:00:00 897 | {"storage_nodes":[]} 
                  |
++-------------------------------------------------------+----------------------------------------+
+*/
+func (ss *shardingSphereProxy) ExportStorageNodes() ([]model.StorageNode, 
error) {
+       query, err := ss.db.Query(`EXPORT STORAGE NODES;`)
+       if err != nil {
+               return nil, xerr.NewCliErr(fmt.Sprintf("export storage nodes 
failure,err=%s", err))
+       }
+       var (
+               id         string
+               createTime string
+               data       string
+       )
+       if query.Next() {
+               if err = query.Scan(&id, &createTime, &data); err != nil {
+                       return nil, xerr.NewCliErr(fmt.Sprintf("query scan 
failure,err=%s", err))
+               }
+
+               if err = query.Close(); err != nil {
+                       return nil, xerr.NewCliErr(fmt.Sprintf("query close 
failure,err=%s", err))
+               }
+       }
+
+       out := model.StorageNodes{}
+       if err = json.Unmarshal([]byte(data), &out); err != nil {
+               return nil, fmt.Errorf("json unmarshal return err=%s", err)
+       }
+       return out.List, nil
+}
+
+// ImportMetaData 备份数据恢复
+func (ss *shardingSphereProxy) ImportMetaData(in *model.ClusterInfo) error {
+       if in == nil {
+               return xerr.NewCliErr("import meta data is nil")
+       }
+       marshal, err := json.Marshal(in)
+       if err != nil {
+               return xerr.NewCliErr(fmt.Sprintf("json marshal,invalid 
data[in=%+v]", in))
+       }
+
+       _, err = ss.db.Exec(fmt.Sprintf(`IMPORT METADATA "%s";`, marshal))
+       if err != nil {
+               return xerr.NewCliErr(fmt.Sprintf("import metadata 
failure,err=%s", err))
+       }
+
+       return nil
 }
diff --git a/pitr/cli/internal/pkg/shardingsphere-proxy_test.go 
b/pitr/cli/internal/pkg/shardingsphere-proxy_test.go
index 743a270..d949161 100644
--- a/pitr/cli/internal/pkg/shardingsphere-proxy_test.go
+++ b/pitr/cli/internal/pkg/shardingsphere-proxy_test.go
@@ -18,17 +18,68 @@
 package pkg
 
 import (
+       "fmt"
+       "time"
+
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
 )
 
-var _ = Describe("IShardingSphere", func() {
+var _ = Describe("IShardingSphereProxy", func() {
        Context("NewShardingSphereProxy", func() {
+               var (
+                       host     = "local"
+                       port     = uint16(13308)
+                       username = "root"
+                       password = "root"
+                       dbName   = "postgres"
+               )
+
                It("Connecting shardingsphere proxy", func() {
                        Skip("Manually exec:dependent environment")
-                       ss, err := NewShardingSphereProxy("root", "root", 
DefaultDbName, "127.0.0.1", 13308)
+                       ss, err := NewShardingSphereProxy(username, password, 
dbName, host, port)
+                       Expect(err).To(BeNil())
+                       Expect(ss).NotTo(BeNil())
+               })
+
+               It("Export meta data", func() {
+                       Skip("Manually exec:dependent environment")
+                       ss, err := NewShardingSphereProxy(username, password, 
dbName, host, port)
+                       Expect(err).To(BeNil())
+                       Expect(ss).NotTo(BeNil())
+
+                       fmt.Println(ss.ExportMetaData())
+               })
+
+               It("Export storage node", func() {
+                       Skip("Manually exec:dependent environment")
+                       ss, err := NewShardingSphereProxy(username, password, 
dbName, host, port)
                        Expect(err).To(BeNil())
                        Expect(ss).NotTo(BeNil())
+
+                       fmt.Println(ss.ExportStorageNodes())
+
+                       ss, err = NewShardingSphereProxy(username, password, 
dbName, host, port)
+                       Expect(err).To(BeNil())
+                       Expect(ss).NotTo(BeNil())
+
+                       fmt.Println(ss.ExportStorageNodes())
                })
+
+               It("Lock and unlock", func() {
+                       Skip("Manually exec:dependent environment")
+                       ss, err := NewShardingSphereProxy(username, password, 
dbName, host, port)
+                       Expect(err).To(BeNil())
+                       Expect(ss).NotTo(BeNil())
+
+                       fmt.Println(ss.LockForRestore())
+                       time.Sleep(time.Second * 5)
+                       fmt.Println(ss.Unlock())
+
+                       fmt.Println(ss.LockForBackup())
+                       time.Sleep(time.Second * 5)
+                       fmt.Println(ss.Unlock())
+               })
+
        })
 })

Reply via email to