This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new 157509f  chore(pitr): pitr backup support display backup progress
     new 72d7b5d  Merge pull request #321 from Xu-Wentao/pitr
157509f is described below

commit 157509f6e878d184f21d6884b725db966715b28e
Author: xuwentao <[email protected]>
AuthorDate: Sun Apr 23 20:17:04 2023 +0800

    chore(pitr): pitr backup support display backup progress
---
 pitr/agent/Makefile                      |   2 +-
 pitr/cli/Makefile                        |   2 +-
 pitr/cli/internal/cmd/backup.go          | 147 ++++++++++++++++++++-----------
 pitr/cli/internal/cmd/backup_test.go     | 129 ++++++++++++++++++++-------
 pitr/cli/internal/pkg/model/as_backup.go |   6 ++
 pitr/cli/pkg/prettyoutput/progress.go    |  35 ++++++++
 6 files changed, 233 insertions(+), 88 deletions(-)

diff --git a/pitr/agent/Makefile b/pitr/agent/Makefile
index 7fd3b96..d1c0a32 100644
--- a/pitr/agent/Makefile
+++ b/pitr/agent/Makefile
@@ -8,7 +8,7 @@ openssl-local:
        openssl req -new -SHA256 -newkey rsa:2048 -nodes -keyout tls.key -out 
tls.csr -subj "/C=CN/ST=beijing/L=beijing/O=/OU=/" && \
        openssl x509 -req -sha256 -days 365 -in tls.csr -signkey tls.key -out 
tls.crt
 test:
-       go test ./... -coverprofile cover.out
+       go test -gcflags=-l -v ./... -coverprofile cover.out
 
 build:
        GOOS=$(GOOS) go build -o pitr-agent
diff --git a/pitr/cli/Makefile b/pitr/cli/Makefile
index d6b27df..b933165 100644
--- a/pitr/cli/Makefile
+++ b/pitr/cli/Makefile
@@ -5,7 +5,7 @@ GOOS := $(shell go env GOOS)
 build:
        GOOS=$(GOOS) go build -o gs_pitr main.go
 test:
-       go test -v ./... -cover -coverprofile cover.out
+       go test -gcflags=-l -v ./... -cover -coverprofile cover.out
 cover:
        go tool cover -html cover.out
 lint:
diff --git a/pitr/cli/internal/cmd/backup.go b/pitr/cli/internal/cmd/backup.go
index 25956fb..ced03a6 100644
--- a/pitr/cli/internal/cmd/backup.go
+++ b/pitr/cli/internal/cmd/backup.go
@@ -20,14 +20,15 @@ package cmd
 import (
        "fmt"
        "os"
-       "sync"
        "time"
 
        "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg"
        "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/model"
        "github.com/apache/shardingsphere-on-cloud/pitr/cli/internal/pkg/xerr"
        "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/logging"
+       "github.com/apache/shardingsphere-on-cloud/pitr/cli/pkg/prettyoutput"
        "github.com/google/uuid"
+       "github.com/jedib0t/go-pretty/v6/progress"
        "github.com/jedib0t/go-pretty/v6/table"
        "github.com/spf13/cobra"
        "github.com/spf13/pflag"
@@ -232,14 +233,10 @@ func execBackup(lsBackup *model.LsBackup) error {
        logging.Info("Starting send backup command to agent server...")
 
        for _, node := range sNodes {
-               node := node
-               agentHost := node.IP
-               if agentHost == "127.0.0.1" {
-                       agentHost = Host
-               }
-               as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", agentHost, 
AgentPort))
+               sn := node
+               as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", 
convertLocalhost(sn.IP), AgentPort))
                g.Go(func() error {
-                       return _execBackup(as, node, dnCh)
+                       return _execBackup(as, sn, dnCh)
                })
        }
 
@@ -269,19 +266,16 @@ func checkAgentServerStatus(lsBackup *model.LsBackup) 
bool {
        available := true
 
        for _, node := range lsBackup.SsBackup.StorageNodes {
-               n := node
-               agentHost := n.IP
-               if agentHost == "127.0.0.1" {
-                       agentHost = Host
-               }
-               as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", agentHost, 
AgentPort))
+               sn := node
+               as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", 
convertLocalhost(sn.IP), AgentPort))
                if err := as.CheckStatus(); err != nil {
-                       statusList = append(statusList, 
&model.AgentServerStatus{IP: n.IP, Status: "Unavailable"})
+                       statusList = append(statusList, 
&model.AgentServerStatus{IP: sn.IP, Status: "Unavailable"})
                        available = false
                } else {
-                       statusList = append(statusList, 
&model.AgentServerStatus{IP: n.IP, Status: "Available"})
+                       statusList = append(statusList, 
&model.AgentServerStatus{IP: sn.IP, Status: "Available"})
                }
        }
+
        t := table.NewWriter()
        t.SetOutputMirror(os.Stdout)
        t.SetTitle("Agent Server Status")
@@ -328,64 +322,100 @@ func _execBackup(as pkg.IAgentServer, node 
*model.StorageNode, dnCh chan *model.
 
 func checkBackupStatus(lsBackup *model.LsBackup) model.BackupStatus {
        var (
-               wg                sync.WaitGroup
                dataNodeMap       = make(map[string]*model.DataNode)
+               dnCh              = make(chan *model.DataNode, 
len(lsBackup.DnList))
                backupFinalStatus = model.SsBackupStatusCompleted
-               statusCh          = make(chan *model.DataNode, 
len(lsBackup.DnList))
+               totalNum          = len(lsBackup.SsBackup.StorageNodes)
+               dnResult          = make([]*model.DataNode, 0)
        )
 
-       // DataNode.IP -> DataNode
        for _, dn := range lsBackup.DnList {
                dataNodeMap[dn.IP] = dn
        }
 
-       for _, sn := range lsBackup.SsBackup.StorageNodes {
-               wg.Add(1)
-               go func(wg *sync.WaitGroup, sn *model.StorageNode) {
-                       defer wg.Done()
-                       agentHost := sn.IP
-                       if agentHost == "127.0.0.1" {
-                               agentHost = Host
-                       }
-                       as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", 
agentHost, AgentPort))
-                       dn := dataNodeMap[sn.IP]
-
-                       // check backup status
-                       status := checkStatus(as, sn, dn.BackupID, 
model.BackupStatus(""), defaultShowDetailRetryTimes)
+       pw := prettyoutput.NewPW(totalNum)
+       go pw.Render()
+       for idx := 0; idx < totalNum; idx++ {
+               sn := lsBackup.SsBackup.StorageNodes[idx]
+               as := pkg.NewAgentServer(fmt.Sprintf("%s:%d", 
convertLocalhost(sn.IP), AgentPort))
+               dn := dataNodeMap[sn.IP]
+               go checkStatus(as, sn, dn, dnCh, pw)
+       }
 
-                       // update DataNode status
-                       dn.Status = status
-                       dn.EndTime = time.Now().Unix()
-                       statusCh <- dn
-               }(&wg, sn)
+       // wait for all data node backup finished
+       time.Sleep(time.Millisecond * 100)
+       for pw.IsRenderInProgress() {
+               if pw.LengthActive() == 0 {
+                       pw.Stop()
+               }
+               time.Sleep(time.Millisecond * 100)
        }
 
-       wg.Wait()
-       close(statusCh)
+       close(dnCh)
 
-       for dn := range statusCh {
-               logging.Info(fmt.Sprintf("data node backup final status: 
[IP:%s, backupID:%s] ==> %s", dn.IP, dn.BackupID, dn.Status))
+       for dn := range dnCh {
+               dnResult = append(dnResult, dn)
                if dn.Status != model.SsBackupStatusCompleted {
                        backupFinalStatus = model.SsBackupStatusFailed
                }
        }
 
+       // print backup result formatted
+       t := table.NewWriter()
+       t.SetOutputMirror(os.Stdout)
+       t.SetTitle("Backup Task Result: %s", backupFinalStatus)
+       t.AppendHeader(table.Row{"#", "Data Node IP", "Data Node Port", 
"Result"})
+
+       for i, dn := range dnResult {
+               t.AppendRow([]interface{}{i + 1, dn.IP, dn.Port, dn.Status})
+               t.AppendSeparator()
+       }
+
+       t.Render()
+
+       lsBackup.DnList = dnResult
        lsBackup.SsBackup.Status = backupFinalStatus
        lsBackup.Info.EndTime = time.Now().Unix()
        return backupFinalStatus
 }
 
-func checkStatus(as pkg.IAgentServer, sn *model.StorageNode, backupID string, 
status model.BackupStatus, retryTimes uint8) model.BackupStatus {
-       if retryTimes+1 == 0 {
-               return status
-       }
-       if status == model.SsBackupStatusCompleted || status == 
model.SsBackupStatusFailed {
-               return status
-       }
+func checkStatus(as pkg.IAgentServer, sn *model.StorageNode, dn 
*model.DataNode, dnCh chan *model.DataNode, pw progress.Writer) {
+       var (
+               // mark check status is done, time ticker should break.
+               done = make(chan bool)
+               // time ticker, try to doCheck request every 2 seconds.
+               ticker = time.Tick(time.Second * 2)
+               // progress bar.
+               tracker = progress.Tracker{Message: fmt.Sprintf("Checking 
backup status  # %s:%d", sn.IP, AgentPort), Total: 0, Units: 
progress.UnitsDefault}
+       )
 
-       // todo: how often to check backup status
-       time.Sleep(time.Second * 2)
+       pw.AppendTracker(&tracker)
+
+       for !tracker.IsDone() {
+               select {
+               case <-done:
+                       return
+               case <-ticker:
+                       status, err := doCheck(as, sn, dn.BackupID, 
defaultShowDetailRetryTimes)
+                       if err != nil {
+                               tracker.MarkAsErrored()
+                               dn.Status = status
+                               dn.EndTime = time.Now().Unix()
+                               dnCh <- dn
+                               done <- true
+                       }
+                       if status == model.SsBackupStatusCompleted || status == 
model.SsBackupStatusFailed {
+                               tracker.MarkAsDone()
+                               dn.Status = status
+                               dn.EndTime = time.Now().Unix()
+                               dnCh <- dn
+                               done <- true
+                       }
+               }
+       }
+}
 
+func doCheck(as pkg.IAgentServer, sn *model.StorageNode, backupID string, 
retries int) (status model.BackupStatus, err error) {
        in := &model.ShowDetailIn{
                DBPort:       sn.Port,
                DBName:       sn.Database,
@@ -397,8 +427,19 @@ func checkStatus(as pkg.IAgentServer, sn 
*model.StorageNode, backupID string, st
        }
        backupInfo, err := as.ShowDetail(in)
        if err != nil {
-               logging.Error(fmt.Sprintf("get storage node [IP:%s] backup 
detail from agent server failed, will retry %d times.\n%s", sn.IP, retryTimes, 
err.Error()))
-               return checkStatus(as, sn, backupID, 
model.SsBackupStatusCheckError, retryTimes-1)
+               if retries == 0 {
+                       return model.SsBackupStatusCheckError, err
+               }
+               time.Sleep(time.Second * 1)
+               return doCheck(as, sn, backupID, retries-1)
+       }
+
+       return backupInfo.Status, nil
+}
+
+func convertLocalhost(ip string) string {
+       if ip == "127.0.0.1" {
+               return Host
        }
-       return checkStatus(as, sn, backupID, backupInfo.Status, retryTimes)
+       return ip
 }
diff --git a/pitr/cli/internal/cmd/backup_test.go 
b/pitr/cli/internal/cmd/backup_test.go
index 1f56485..ada9434 100644
--- a/pitr/cli/internal/cmd/backup_test.go
+++ b/pitr/cli/internal/cmd/backup_test.go
@@ -36,7 +36,7 @@ import (
 var ctrl *gomock.Controller
 
 var _ = Describe("Backup", func() {
-       Context("check status", func() {
+       Context("do check", func() {
                var (
                        as *mock_pkg.MockIAgentServer
                        sn = &model.StorageNode{
@@ -53,23 +53,31 @@ var _ = Describe("Backup", func() {
 
                It("agent server return err", func() {
                        as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: 
defaultInstance}).Return(nil, errors.New("timeout"))
-                       Expect(checkStatus(as, sn, "", "", 
0)).To(Equal(model.SsBackupStatusCheckError))
+                       status, err := doCheck(as, sn, "", 0)
+                       Expect(err).To(HaveOccurred())
+                       Expect(status).To(Equal(model.SsBackupStatusCheckError))
                })
 
                It("mock agent server and return failed status", func() {
                        as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: 
defaultInstance}).Return(&model.BackupInfo{Status: model.SsBackupStatusFailed}, 
nil)
-                       Expect(checkStatus(as, sn, "", "", 
0)).To(Equal(model.SsBackupStatusFailed))
+                       status, err := doCheck(as, sn, "", 0)
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(status).To(Equal(model.SsBackupStatusFailed))
                })
 
                It("mock agent server and return completed status", func() {
                        as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: 
defaultInstance}).Return(&model.BackupInfo{Status: 
model.SsBackupStatusCompleted}, nil)
-                       Expect(checkStatus(as, sn, "", "", 
0)).To(Equal(model.SsBackupStatusCompleted))
+                       status, err := doCheck(as, sn, "", 0)
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(status).To(Equal(model.SsBackupStatusCompleted))
                })
 
-               It("mock agent server and return timeout error first time and 
then retry 1 time return completed status", func() {
-                       as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: 
defaultInstance}).Times(1).Return(nil, errors.New("timeout"))
+               It("mock agent server and return check err first time and then 
success", func() {
+                       as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: 
defaultInstance}).Return(nil, errors.New("timeout"))
                        as.EXPECT().ShowDetail(&model.ShowDetailIn{Instance: 
defaultInstance}).Return(&model.BackupInfo{Status: 
model.SsBackupStatusCompleted}, nil)
-                       Expect(checkStatus(as, sn, "", "", 
1)).To(Equal(model.SsBackupStatusCompleted))
+                       status, err := doCheck(as, sn, "", 1)
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(status).To(Equal(model.SsBackupStatusCompleted))
                })
        })
 
@@ -78,17 +86,15 @@ var _ = Describe("Backup", func() {
                        proxy *mock_pkg.MockIShardingSphereProxy
                        ls    *mock_pkg.MockILocalStorage
                )
-
                BeforeEach(func() {
                        ctrl = gomock.NewController(GinkgoT())
                        proxy = mock_pkg.NewMockIShardingSphereProxy(ctrl)
                        ls = mock_pkg.NewMockILocalStorage(ctrl)
-               })
 
+               })
                AfterEach(func() {
                        ctrl.Finish()
                })
-
                It("export data", func() {
                        // mock proxy export metadata
                        
proxy.EXPECT().ExportMetaData().Return(&model.ClusterInfo{}, nil)
@@ -104,16 +110,11 @@ var _ = Describe("Backup", func() {
                        Expect(bk.Info.CSN).To(Equal(""))
                })
        })
-       Context("exec backup", func() {
 
-               var as *mock_pkg.MockIAgentServer
-               bak := &model.LsBackup{
-                       DnList: nil,
-                       SsBackup: &model.SsBackup{
-                               Status:       "Running",
-                               StorageNodes: []*model.StorageNode{},
-                       },
-               }
+       Context("exec backup", func() {
+               var (
+                       as *mock_pkg.MockIAgentServer
+               )
                BeforeEach(func() {
                        ctrl = gomock.NewController(GinkgoT())
                        as = mock_pkg.NewMockIAgentServer(ctrl)
@@ -121,6 +122,14 @@ var _ = Describe("Backup", func() {
                AfterEach(func() {
                        ctrl.Finish()
                })
+               bak := &model.LsBackup{
+                       DnList: nil,
+                       SsBackup: &model.SsBackup{
+                               Status:       "Running",
+                               StorageNodes: []*model.StorageNode{},
+                       },
+               }
+
                It("exec backup empty storage nodes", func() {
                        Expect(execBackup(bak)).To(BeNil())
                })
@@ -155,22 +164,76 @@ var _ = Describe("Backup", func() {
                })
        })
 
-       Context("exec backup", func() {
-               It("exec backup", func() {
-                       var (
-                               as       *mock_pkg.MockIAgentServer
-                               node     = &model.StorageNode{}
-                               failSnCh = make(chan *model.StorageNode, 10)
-                               dnCh     = make(chan *model.DataNode, 10)
-                       )
+       Context("check backup status", func() {
+               var (
+                       as       *mock_pkg.MockIAgentServer
+                       lsbackup *model.LsBackup
+               )
+               BeforeEach(func() {
+                       lsbackup = &model.LsBackup{
+                               DnList: []*model.DataNode{
+                                       {
+                                               IP:   "127.0.0.1",
+                                               Port: 3306,
+                                       },
+                                       {
+                                               IP:   "127.0.0.2",
+                                               Port: 3307,
+                                       },
+                               },
+                               SsBackup: &model.SsBackup{
+                                       Status: "Running",
+                                       StorageNodes: []*model.StorageNode{
+                                               {
+                                                       IP:   "127.0.0.1",
+                                                       Port: 3306,
+                                               },
+                                               {
+                                                       IP:   "127.0.0.2",
+                                                       Port: 3307,
+                                               },
+                                       },
+                               },
+                               Info: &model.BackupMetaInfo{},
+                       }
+
+                       ctrl = gomock.NewController(GinkgoT())
                        as = mock_pkg.NewMockIAgentServer(ctrl)
 
-                       defer close(dnCh)
-                       defer ctrl.Finish()
-                       as.EXPECT().Backup(gomock.Any()).Return("backup-id", 
nil)
-                       Expect(_execBackup(as, node, dnCh)).To(BeNil())
-                       Expect(len(failSnCh)).To(Equal(0))
-                       Expect(len(dnCh)).To(Equal(1))
+                       monkey.Patch(pkg.NewAgentServer, func(_ string) 
pkg.IAgentServer {
+                               return as
+                       })
+               })
+               AfterEach(func() {
+                       ctrl.Finish()
+                       monkey.UnpatchAll()
+               })
+
+               It("check error", func() {
+                       as.EXPECT().ShowDetail(gomock.Any()).Return(nil, 
errors.New("timeout")).AnyTimes()
+                       
Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusFailed))
+               })
+
+               It("check error 2", func() {
+                       as.EXPECT().ShowDetail(gomock.Any()).Return(nil, 
errors.New("timeout")).Times(1)
+                       
as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: 
model.SsBackupStatusFailed}, nil).AnyTimes()
+                       
Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusFailed))
+               })
+
+               It("check error 3", func() {
+                       as.EXPECT().ShowDetail(gomock.Any()).Return(nil, 
errors.New("timeout")).Times(2)
+                       
as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: 
model.SsBackupStatusCompleted}, nil).AnyTimes()
+                       
Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusCompleted))
+               })
+
+               It("check failed", func() {
+                       
as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: 
model.SsBackupStatusFailed}, nil).AnyTimes()
+                       
Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusFailed))
+               })
+
+               It("check success", func() {
+                       
as.EXPECT().ShowDetail(gomock.Any()).Return(&model.BackupInfo{Status: 
model.SsBackupStatusCompleted}, nil).AnyTimes()
+                       
Expect(checkBackupStatus(lsbackup)).To(Equal(model.SsBackupStatusCompleted))
                })
        })
 })
diff --git a/pitr/cli/internal/pkg/model/as_backup.go 
b/pitr/cli/internal/pkg/model/as_backup.go
index bd18148..ee3078f 100644
--- a/pitr/cli/internal/pkg/model/as_backup.go
+++ b/pitr/cli/internal/pkg/model/as_backup.go
@@ -45,3 +45,9 @@ type AgentServerStatus struct {
        IP     string `json:"ip"`
        Status string `json:"status"`
 }
+
+type BackupResult struct {
+       IP     string       `json:"ip"`
+       Port   uint16       `json:"port"`
+       Status BackupStatus `json:"status"`
+}
diff --git a/pitr/cli/pkg/prettyoutput/progress.go 
b/pitr/cli/pkg/prettyoutput/progress.go
new file mode 100644
index 0000000..e70552a
--- /dev/null
+++ b/pitr/cli/pkg/prettyoutput/progress.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prettyoutput
+
+import (
+       "github.com/jedib0t/go-pretty/v6/progress"
+)
+
+func NewPW(totalNum int) progress.Writer {
+       pw := progress.NewWriter()
+       pw.SetTrackerLength(25)
+       pw.SetAutoStop(true)
+       pw.SetNumTrackersExpected(totalNum)
+       pw.SetSortBy(progress.SortByPercentDsc)
+       style := progress.StyleDefault
+       style.Options.PercentIndeterminate = "running"
+       pw.SetStyle(style)
+       pw.SetTrackerPosition(progress.PositionRight)
+       return pw
+}

Reply via email to