This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 9a7b6f8 feat(pitr):wrap ShardingSphere Proxy DistSQL (#262)
9a7b6f8 is described below
commit 9a7b6f827eedbb7f923d728753a24084c5332cc6
Author: lltgo <[email protected]>
AuthorDate: Fri Mar 10 18:25:06 2023 +0800
feat(pitr):wrap ShardingSphere Proxy DistSQL (#262)
---
pitr/cli/internal/pkg/local-storage_test.go | 12 +-
pitr/cli/internal/pkg/model/ls_backup.go | 33 ++---
.../ss_backup.go} | 41 +++---
pitr/cli/internal/pkg/shardingsphere-proxy.go | 139 ++++++++++++++++++++-
pitr/cli/internal/pkg/shardingsphere-proxy_test.go | 55 +++++++-
5 files changed, 229 insertions(+), 51 deletions(-)
diff --git a/pitr/cli/internal/pkg/local-storage_test.go
b/pitr/cli/internal/pkg/local-storage_test.go
index 40e1d72..2781d71 100644
--- a/pitr/cli/internal/pkg/local-storage_test.go
+++ b/pitr/cli/internal/pkg/local-storage_test.go
@@ -19,11 +19,13 @@ package pkg
import (
"fmt"
- "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
- "github.com/google/uuid"
"os"
"time"
+ "github.com/google/uuid"
+
+ "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
+
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
@@ -94,7 +96,7 @@ var _ = Describe("ILocalStorage", func() {
ID: uuid.New().String(),
CSN: uuid.New().String(),
StartTime: time.Now().Unix(),
- Endtime:
time.Now().Add(time.Minute).Unix(),
+ EndTime:
time.Now().Add(time.Minute).Unix(),
},
DnList: []model.DataNode{
{
@@ -103,7 +105,7 @@ var _ = Describe("ILocalStorage", func() {
Status: "Completed",
BackupID: "SK08DAK1",
StartTime: time.Now().Unix(),
- Endtime: time.Now().Unix(),
+ EndTime: time.Now().Unix(),
},
{
IP: "1.1.1.2",
@@ -111,7 +113,7 @@ var _ = Describe("ILocalStorage", func() {
Status: "Completed",
BackupID: "SK08DAK2",
StartTime: time.Now().Unix(),
- Endtime: time.Now().Unix(),
+ EndTime: time.Now().Unix(),
},
},
SsBackup: &model.SsBackup{
diff --git a/pitr/cli/internal/pkg/model/ls_backup.go
b/pitr/cli/internal/pkg/model/ls_backup.go
index 547d688..48514bf 100644
--- a/pitr/cli/internal/pkg/model/ls_backup.go
+++ b/pitr/cli/internal/pkg/model/ls_backup.go
@@ -29,7 +29,7 @@ type (
ID string `json:"id"`
CSN string `json:"csn"`
StartTime int64 `json:"start_time"` // Unix time
- Endtime int64 `json:"end_time"` // Unix time
+ EndTime int64 `json:"end_time"` // Unix time
}
DataNode struct {
@@ -38,23 +38,18 @@ type (
Status string `json:"status"`
BackupID string `json:"backup_id"`
StartTime int64 `json:"start_time"` // Unix time
- Endtime int64 `json:"end_time"` // Unix time
+ EndTime int64 `json:"end_time"` // Unix time
}
)
type (
SsBackup struct {
- Status string `json:"status"`
- ClusterInfo ClusterInfo `json:"cluster_info"`
- StorageNodes []StorageNodes `json:"storage_nodes"`
+ Status string `json:"status"`
+ ClusterInfo ClusterInfo `json:"cluster_info"`
+ StorageNodes []StorageNode `json:"storage_nodes"`
}
- ClusterInfo struct {
- MetaData MetaData `json:"meta_data"`
- SnapshotInfo SnapshotInfo `json:"snapshot_info"`
- }
-
- StorageNodes struct {
+ StorageNode struct {
IP string `json:"ip"`
Port string `json:"port"`
Username string `json:"username"`
@@ -63,19 +58,7 @@ type (
Remark string `json:"remark"`
}
- MetaData struct {
- Databases Databases `json:"databases"`
- Props string `json:"props"`
- Rules string `json:"rules"`
- }
-
- Databases struct {
- ShardingDb string `json:"sharding_db"`
- AnotherDb string `json:"another_db"`
- }
-
- SnapshotInfo struct {
- Csn string `json:"csn"`
- CreateTime string `json:"create_time"`
+ StorageNodes struct {
+ List []StorageNode `json:"storage_nodes"`
}
)
diff --git a/pitr/cli/internal/pkg/shardingsphere-proxy_test.go
b/pitr/cli/internal/pkg/model/ss_backup.go
similarity index 58%
copy from pitr/cli/internal/pkg/shardingsphere-proxy_test.go
copy to pitr/cli/internal/pkg/model/ss_backup.go
index 743a270..b58ad65 100644
--- a/pitr/cli/internal/pkg/shardingsphere-proxy_test.go
+++ b/pitr/cli/internal/pkg/model/ss_backup.go
@@ -15,20 +15,31 @@
* limitations under the License.
*/
-package pkg
+package model
-import (
- . "github.com/onsi/ginkgo/v2"
- . "github.com/onsi/gomega"
-)
+type (
+ SsBackupInfo struct {
+ ClusterInfo ClusterInfo `json:"cluster_info"`
+ }
+
+ ClusterInfo struct {
+ MetaData MetaData `json:"meta_data"`
+ SnapshotInfo SnapshotInfo `json:"snapshot_info"`
+ }
+
+ MetaData struct {
+ Databases Databases `json:"databases"`
+ Props string `json:"props"`
+ Rules string `json:"rules"`
+ }
-var _ = Describe("IShardingSphere", func() {
- Context("NewShardingSphereProxy", func() {
- It("Connecting shardingsphere proxy", func() {
- Skip("Manually exec:dependent environment")
- ss, err := NewShardingSphereProxy("root", "root",
DefaultDbName, "127.0.0.1", 13308)
- Expect(err).To(BeNil())
- Expect(ss).NotTo(BeNil())
- })
- })
-})
+ Databases struct {
+ ShardingDb string `json:"sharding_db"`
+ AnotherDb string `json:"another_db"`
+ }
+
+ SnapshotInfo struct {
+ Csn string `json:"csn"`
+ CreateTime string `json:"create_time"`
+ }
+)
diff --git a/pitr/cli/internal/pkg/shardingsphere-proxy.go
b/pitr/cli/internal/pkg/shardingsphere-proxy.go
index 215294e..bd0be41 100644
--- a/pitr/cli/internal/pkg/shardingsphere-proxy.go
+++ b/pitr/cli/internal/pkg/shardingsphere-proxy.go
@@ -19,23 +19,34 @@ package pkg
import (
"database/sql"
+ "encoding/json"
"fmt"
+
+ "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
+ "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/xerr"
"github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/gsutil"
)
type (
- shardingSphere struct {
+ shardingSphereProxy struct {
db *sql.DB
}
- IShardingSphere interface{}
+ IShardingSphereProxy interface {
+ ExportMetaData() (*model.ClusterInfo, error)
+ ExportStorageNodes() ([]model.StorageNode, error)
+ LockForRestore() error
+ LockForBackup() error
+ Unlock() error
+ ImportMetaData(in *model.ClusterInfo) error
+ }
)
const (
DefaultDbName = "postgres"
)
-func NewShardingSphereProxy(user, password, dbName, host string, port uint16)
(IShardingSphere, error) {
+func NewShardingSphereProxy(user, password, dbName, host string, port uint16)
(IShardingSphereProxy, error) {
db, err := gsutil.Open(user, password, dbName, host, port)
if err != nil {
return nil, err
@@ -44,5 +55,125 @@ func NewShardingSphereProxy(user, password, dbName, host
string, port uint16) (I
efmt := "db ping
fail[host=%s,port=%d,user=%s,pwLen=%d,dbName=%s],err=%s"
return nil, fmt.Errorf(efmt, host, port, user, len(password),
dbName, err)
}
- return &shardingSphere{db: db}, nil
+ return &shardingSphereProxy{db: db}, nil
+}
+
+/*
+LockForBackup 停写,同时锁 CSN,备份场景使用
+*/
+func (ss *shardingSphereProxy) LockForBackup() error {
+ _, err := ss.db.Exec(`LOCK CLUSTER WITH
LOCK_STRATEGY(TYPE(NAME="WRITE", PROPERTIES("lock_csn"=true)));`)
+ if err != nil {
+ return xerr.NewCliErr("ss lock for backup failure")
+ }
+ return nil
+}
+
+/*
+LockForRestore 停读写,不需要锁 CSN,恢复场景使用
+*/
+func (ss *shardingSphereProxy) LockForRestore() error {
+ _, err := ss.db.Exec(`LOCK CLUSTER WITH
LOCK_STRATEGY(TYPE(NAME="READ_WRITE"))`)
+ if err != nil {
+ return xerr.NewCliErr("ss lock for restore failure")
+ }
+ return nil
+}
+
+func (ss *shardingSphereProxy) Unlock() error {
+ _, err := ss.db.Exec("UNLOCK CLUSTER;")
+ if err != nil {
+ return xerr.NewCliErr("ss unlock failure")
+ }
+ return nil
+}
+
+/*
+ExportMetaData 导出 SS 元数据
+
++-----------------------------+-------------------------+----------------------------------------+
+| id | create_time | data
|
++-------------------------------------------------------+----------------------------------------+
+| 734bb036-b15d-4af0-be87-237 | 2023-01-01 12:00:00 897 |
{"meta_data":{},"snapshot_info":{}} |
++-------------------------------------------------------+----------------------------------------+
+*/
+func (ss *shardingSphereProxy) ExportMetaData() (*model.ClusterInfo, error) {
+ query, err := ss.db.Query(`EXPORT METADATA`)
+ if err != nil {
+ return nil, xerr.NewCliErr(fmt.Sprintf("export meta data
failure,err=%s", err))
+ }
+ var (
+ id string
+ createTime string
+ data string
+ )
+ if query.Next() {
+ if err = query.Scan(&id, &createTime, &data); err != nil {
+ return nil, xerr.NewCliErr(fmt.Sprintf("query scan
failure,err=%s", err))
+ }
+
+ if err = query.Close(); err != nil {
+ return nil, xerr.NewCliErr(fmt.Sprintf("query close
failure,err=%s", err))
+ }
+ }
+
+ out := model.SsBackupInfo{}
+ if err = json.Unmarshal([]byte(data), &out); err != nil {
+ return nil, fmt.Errorf("json unmarshal return err=%s", err)
+ }
+ return nil, nil
+}
+
+/*
+ExportStorageNodes 导出存储节点数据
+
++-----------------------------+-------------------------+----------------------------------------+
+| id | create_time | data
|
++-------------------------------------------------------+----------------------------------------+
+| 734bb036-b15d-4af0-be87-237 | 2023-01-01 12:00:00 897 | {"storage_nodes":[]}
|
++-------------------------------------------------------+----------------------------------------+
+*/
+func (ss *shardingSphereProxy) ExportStorageNodes() ([]model.StorageNode,
error) {
+ query, err := ss.db.Query(`EXPORT STORAGE NODES;`)
+ if err != nil {
+ return nil, xerr.NewCliErr(fmt.Sprintf("export storage nodes
failure,err=%s", err))
+ }
+ var (
+ id string
+ createTime string
+ data string
+ )
+ if query.Next() {
+ if err = query.Scan(&id, &createTime, &data); err != nil {
+ return nil, xerr.NewCliErr(fmt.Sprintf("query scan
failure,err=%s", err))
+ }
+
+ if err = query.Close(); err != nil {
+ return nil, xerr.NewCliErr(fmt.Sprintf("query close
failure,err=%s", err))
+ }
+ }
+
+ out := model.StorageNodes{}
+ if err = json.Unmarshal([]byte(data), &out); err != nil {
+ return nil, fmt.Errorf("json unmarshal return err=%s", err)
+ }
+ return out.List, nil
+}
+
+// ImportMetaData 备份数据恢复
+func (ss *shardingSphereProxy) ImportMetaData(in *model.ClusterInfo) error {
+ if in == nil {
+ return xerr.NewCliErr("import meta data is nil")
+ }
+ marshal, err := json.Marshal(in)
+ if err != nil {
+ return xerr.NewCliErr(fmt.Sprintf("json marshal,invalid
data[in=%+v]", in))
+ }
+
+ _, err = ss.db.Exec(fmt.Sprintf(`IMPORT METADATA "%s";`, marshal))
+ if err != nil {
+ return xerr.NewCliErr(fmt.Sprintf("import metadata
failure,err=%s", err))
+ }
+
+ return nil
}
diff --git a/pitr/cli/internal/pkg/shardingsphere-proxy_test.go
b/pitr/cli/internal/pkg/shardingsphere-proxy_test.go
index 743a270..d949161 100644
--- a/pitr/cli/internal/pkg/shardingsphere-proxy_test.go
+++ b/pitr/cli/internal/pkg/shardingsphere-proxy_test.go
@@ -18,17 +18,68 @@
package pkg
import (
+ "fmt"
+ "time"
+
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
-var _ = Describe("IShardingSphere", func() {
+var _ = Describe("IShardingSphereProxy", func() {
Context("NewShardingSphereProxy", func() {
+ var (
+ host = "local"
+ port = uint16(13308)
+ username = "root"
+ password = "root"
+ dbName = "postgres"
+ )
+
It("Connecting shardingsphere proxy", func() {
Skip("Manually exec:dependent environment")
- ss, err := NewShardingSphereProxy("root", "root",
DefaultDbName, "127.0.0.1", 13308)
+ ss, err := NewShardingSphereProxy(username, password,
dbName, host, port)
+ Expect(err).To(BeNil())
+ Expect(ss).NotTo(BeNil())
+ })
+
+ It("Export meta data", func() {
+ Skip("Manually exec:dependent environment")
+ ss, err := NewShardingSphereProxy(username, password,
dbName, host, port)
+ Expect(err).To(BeNil())
+ Expect(ss).NotTo(BeNil())
+
+ fmt.Println(ss.ExportMetaData())
+ })
+
+ It("Export storage node", func() {
+ Skip("Manually exec:dependent environment")
+ ss, err := NewShardingSphereProxy(username, password,
dbName, host, port)
Expect(err).To(BeNil())
Expect(ss).NotTo(BeNil())
+
+ fmt.Println(ss.ExportStorageNodes())
+
+ ss, err = NewShardingSphereProxy(username, password,
dbName, host, port)
+ Expect(err).To(BeNil())
+ Expect(ss).NotTo(BeNil())
+
+ fmt.Println(ss.ExportStorageNodes())
})
+
+ It("Lock and unlock", func() {
+ Skip("Manually exec:dependent environment")
+ ss, err := NewShardingSphereProxy(username, password,
dbName, host, port)
+ Expect(err).To(BeNil())
+ Expect(ss).NotTo(BeNil())
+
+ fmt.Println(ss.LockForRestore())
+ time.Sleep(time.Second * 5)
+ fmt.Println(ss.Unlock())
+
+ fmt.Println(ss.LockForBackup())
+ time.Sleep(time.Second * 5)
+ fmt.Println(ss.Unlock())
+ })
+
})
})