This is an automated email from the ASF dual-hosted git repository.

asifdxtreme pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git


The following commit(s) were added to refs/heads/master by this push:
     new dc76a75  SCB-988 Add loadbalancer in sc client (#479)
dc76a75 is described below

commit dc76a7515fa4518b3bf2a75327661e1bb97fa3c4
Author: little-cui <sure_0...@qq.com>
AuthorDate: Sat Nov 3 00:16:48 2018 +0800

    SCB-988 Add loadbalancer in sc client (#479)
---
 pkg/client/sc/apis.go                              | 34 ++++++--------
 pkg/client/sc/client.go                            | 18 ++++++--
 pkg/client/sc/{client.go => client_lb.go}          | 34 ++++++++++++--
 pkg/client/sc/config.go                            |  4 +-
 pkg/{client/sc/client.go => lb/loadbalancer.go}    | 19 ++------
 .../sc/config.go => lb/loadbalancer_test.go}       | 53 +++++++++++++---------
 pkg/{client/sc/client.go => lb/roundrobin.go}      | 38 +++++++++++-----
 pkg/rest/client.go                                 | 10 ++--
 scctl/pkg/cmd/cmd.go                               |  4 +-
 .../pkg/discovery/servicecenter/aggregate.go       | 16 +++----
 10 files changed, 130 insertions(+), 100 deletions(-)

diff --git a/pkg/client/sc/apis.go b/pkg/client/sc/apis.go
index 63e96e7..2947297 100644
--- a/pkg/client/sc/apis.go
+++ b/pkg/client/sc/apis.go
@@ -47,16 +47,8 @@ func (c *SCClient) toError(body []byte) *scerr.Error {
        return message
 }
 
-func (c *SCClient) commonHeaders() http.Header {
-       var headers = make(http.Header)
-       if len(c.Config.Token) > 0 {
-               headers.Set("X-Auth-Token", c.Config.Token)
-       }
-       return headers
-}
-
 func (c *SCClient) GetScVersion() (*version.VersionSet, *scerr.Error) {
-       resp, err := c.URLClient.HttpDo(http.MethodGet, 
c.Config.Addr+apiVersionURL, c.commonHeaders(), nil)
+       resp, err := c.RestDo(http.MethodGet, apiVersionURL, c.CommonHeaders(), 
nil)
        if err != nil {
                return nil, scerr.NewError(scerr.ErrInternal, err.Error())
        }
@@ -82,10 +74,10 @@ func (c *SCClient) GetScVersion() (*version.VersionSet, 
*scerr.Error) {
 }
 
 func (c *SCClient) GetScCache() (*model.Cache, *scerr.Error) {
-       headers := c.commonHeaders()
+       headers := c.CommonHeaders()
        // only default domain has admin permission
        headers.Set("X-Domain-Name", "default")
-       resp, err := c.URLClient.HttpDo(http.MethodGet, 
c.Config.Addr+apiDumpURL, headers, nil)
+       resp, err := c.RestDo(http.MethodGet, apiDumpURL, headers, nil)
        if err != nil {
                return nil, scerr.NewError(scerr.ErrInternal, err.Error())
        }
@@ -112,10 +104,10 @@ func (c *SCClient) GetScCache() (*model.Cache, 
*scerr.Error) {
 
 func (c *SCClient) GetSchemasByServiceId(domainProject, serviceId string) 
([]*pb.Schema, *scerr.Error) {
        domain, project := core.FromDomainProject(domainProject)
-       headers := c.commonHeaders()
+       headers := c.CommonHeaders()
        headers.Set("X-Domain-Name", domain)
-       resp, err := c.URLClient.HttpDo(http.MethodGet,
-               c.Config.Addr+fmt.Sprintf(apiSchemasURL, project, 
serviceId)+"?withSchema=1",
+       resp, err := c.RestDo(http.MethodGet,
+               fmt.Sprintf(apiSchemasURL, project, serviceId)+"?withSchema=1",
                headers, nil)
        if err != nil {
                return nil, scerr.NewError(scerr.ErrInternal, err.Error())
@@ -143,10 +135,10 @@ func (c *SCClient) GetSchemasByServiceId(domainProject, 
serviceId string) ([]*pb
 
 func (c *SCClient) GetSchemaBySchemaId(domainProject, serviceId, schemaId 
string) (*pb.Schema, *scerr.Error) {
        domain, project := core.FromDomainProject(domainProject)
-       headers := c.commonHeaders()
+       headers := c.CommonHeaders()
        headers.Set("X-Domain-Name", domain)
-       resp, err := c.URLClient.HttpDo(http.MethodGet,
-               c.Config.Addr+fmt.Sprintf(apiSchemaURL, project, serviceId, 
schemaId),
+       resp, err := c.RestDo(http.MethodGet,
+               fmt.Sprintf(apiSchemaURL, project, serviceId, schemaId),
                headers, nil)
        if err != nil {
                return nil, scerr.NewError(scerr.ErrInternal, err.Error())
@@ -177,10 +169,10 @@ func (c *SCClient) GetSchemaBySchemaId(domainProject, 
serviceId, schemaId string
 }
 
 func (c *SCClient) GetClusters() (registry.Clusters, *scerr.Error) {
-       headers := c.commonHeaders()
+       headers := c.CommonHeaders()
        // only default domain has admin permission
        headers.Set("X-Domain-Name", "default")
-       resp, err := c.URLClient.HttpDo(http.MethodGet, 
c.Config.Addr+apiClustersURL, headers, nil)
+       resp, err := c.RestDo(http.MethodGet, apiClustersURL, headers, nil)
        if err != nil {
                return nil, scerr.NewError(scerr.ErrInternal, err.Error())
        }
@@ -206,10 +198,10 @@ func (c *SCClient) GetClusters() (registry.Clusters, 
*scerr.Error) {
 }
 
 func (c *SCClient) HealthCheck() *scerr.Error {
-       headers := c.commonHeaders()
+       headers := c.CommonHeaders()
        // only default domain has admin permission
        headers.Set("X-Domain-Name", "default")
-       resp, err := c.URLClient.HttpDo(http.MethodGet, 
c.Config.Addr+apiHealthURL, headers, nil)
+       resp, err := c.RestDo(http.MethodGet, apiHealthURL, headers, nil)
        if err != nil {
                return scerr.NewError(scerr.ErrUnavailableBackend, err.Error())
        }
diff --git a/pkg/client/sc/client.go b/pkg/client/sc/client.go
index 0d95dcd..38ab11b 100644
--- a/pkg/client/sc/client.go
+++ b/pkg/client/sc/client.go
@@ -16,18 +16,26 @@
 package sc
 
 import (
-       "github.com/apache/incubator-servicecomb-service-center/pkg/rest"
+       "net/http"
 )
 
 func NewSCClient(cfg Config) (*SCClient, error) {
-       client, err := rest.GetURLClient(cfg.Merge())
+       client, err := NewLBClient(cfg.Endpoints, cfg.Merge())
        if err != nil {
                return nil, err
        }
-       return &SCClient{Config: cfg, URLClient: client}, nil
+       return &SCClient{LBClient: client, Token: cfg.Token}, nil
 }
 
 type SCClient struct {
-       *rest.URLClient
-       Config Config
+       *LBClient
+       Token string
+}
+
+func (c *SCClient) CommonHeaders() http.Header {
+       var headers = make(http.Header)
+       if len(c.Token) > 0 {
+               headers.Set("X-Auth-Token", c.Token)
+       }
+       return headers
 }
diff --git a/pkg/client/sc/client.go b/pkg/client/sc/client_lb.go
similarity index 53%
copy from pkg/client/sc/client.go
copy to pkg/client/sc/client_lb.go
index 0d95dcd..015307d 100644
--- a/pkg/client/sc/client.go
+++ b/pkg/client/sc/client_lb.go
@@ -16,18 +16,42 @@
 package sc
 
 import (
+       "github.com/apache/incubator-servicecomb-service-center/pkg/lb"
        "github.com/apache/incubator-servicecomb-service-center/pkg/rest"
+       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       "net/http"
 )
 
-func NewSCClient(cfg Config) (*SCClient, error) {
-       client, err := rest.GetURLClient(cfg.Merge())
+func NewLBClient(endpoints []string, options rest.URLClientOption) (*LBClient, 
error) {
+       client, err := rest.GetURLClient(options)
        if err != nil {
                return nil, err
        }
-       return &SCClient{Config: cfg, URLClient: client}, nil
+       return &LBClient{
+               Retries:   len(endpoints),
+               LB:        lb.NewRoundRobinLB(endpoints),
+               URLClient: client,
+       }, nil
 }
 
-type SCClient struct {
+type LBClient struct {
        *rest.URLClient
-       Config Config
+       Retries int
+       LB      lb.LoadBalancer
+}
+
+func (c *LBClient) Next() string {
+       return c.LB.Next()
+}
+
+func (c *LBClient) RestDo(method string, api string, headers http.Header, body 
[]byte) (resp *http.Response, err error) {
+       for i := 0; i < c.Retries; i++ {
+               resp, err = c.HttpDo(method, c.Next()+api, headers, body)
+               if err != nil {
+                       util.GetBackoff().Delay(i)
+                       continue
+               }
+               break
+       }
+       return
 }
diff --git a/pkg/client/sc/config.go b/pkg/client/sc/config.go
index 0f8d64f..f78845b 100644
--- a/pkg/client/sc/config.go
+++ b/pkg/client/sc/config.go
@@ -24,14 +24,14 @@ import (
 
 type Config struct {
        rest.URLClientOption
-       Addr string
+       Endpoints []string
        // TODO Expandable header not only token header
        Token          string
        CertKeyPWDPath string
 }
 
 func (cfg *Config) Merge() rest.URLClientOption {
-       ssl := strings.Index(cfg.Addr, "https://";) >= 0
+       ssl := strings.Index(cfg.Endpoints[0], "https://";) >= 0
        if ssl && len(cfg.CertKeyPWD) == 0 && len(cfg.CertKeyPWDPath) > 0 {
                content, _ := ioutil.ReadFile(cfg.CertKeyPWDPath)
                cfg.CertKeyPWD = string(content)
diff --git a/pkg/client/sc/client.go b/pkg/lb/loadbalancer.go
similarity index 70%
copy from pkg/client/sc/client.go
copy to pkg/lb/loadbalancer.go
index 0d95dcd..c1ee278 100644
--- a/pkg/client/sc/client.go
+++ b/pkg/lb/loadbalancer.go
@@ -13,21 +13,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package sc
+package lb
 
-import (
-       "github.com/apache/incubator-servicecomb-service-center/pkg/rest"
-)
-
-func NewSCClient(cfg Config) (*SCClient, error) {
-       client, err := rest.GetURLClient(cfg.Merge())
-       if err != nil {
-               return nil, err
-       }
-       return &SCClient{Config: cfg, URLClient: client}, nil
-}
-
-type SCClient struct {
-       *rest.URLClient
-       Config Config
+type LoadBalancer interface {
+       Next() string
 }
diff --git a/pkg/client/sc/config.go b/pkg/lb/loadbalancer_test.go
similarity index 52%
copy from pkg/client/sc/config.go
copy to pkg/lb/loadbalancer_test.go
index 0f8d64f..c221c6c 100644
--- a/pkg/client/sc/config.go
+++ b/pkg/lb/loadbalancer_test.go
@@ -13,32 +13,41 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package sc
+package lb
 
 import (
-       "github.com/apache/incubator-servicecomb-service-center/pkg/rest"
-       "io/ioutil"
-       "strings"
-       "time"
+       "testing"
 )
 
-type Config struct {
-       rest.URLClientOption
-       Addr string
-       // TODO Expandable header not only token header
-       Token          string
-       CertKeyPWDPath string
-}
-
-func (cfg *Config) Merge() rest.URLClientOption {
-       ssl := strings.Index(cfg.Addr, "https://";) >= 0
-       if ssl && len(cfg.CertKeyPWD) == 0 && len(cfg.CertKeyPWDPath) > 0 {
-               content, _ := ioutil.ReadFile(cfg.CertKeyPWDPath)
-               cfg.CertKeyPWD = string(content)
+func TestNewRoundRobinLB(t *testing.T) {
+       lb := NewRoundRobinLB(nil)
+       if lb.Next() != "" {
+               t.Fatalf("TestNewRoundRobinLB failed")
+       }
+       lb = NewRoundRobinLB([]string{"1"})
+       if lb.Next() != "1" {
+               t.Fatalf("TestNewRoundRobinLB failed")
+       }
+       if lb.Next() != "1" {
+               t.Fatalf("TestNewRoundRobinLB failed")
        }
-       cfg.SSLEnabled = ssl
-       if cfg.RequestTimeout == 0 {
-               cfg.RequestTimeout = 10 * time.Second
+       lb = NewRoundRobinLB([]string{"1", "2"})
+       if lb.Next() != "1" {
+               t.Fatalf("TestNewRoundRobinLB failed")
        }
-       return cfg.URLClientOption
+       if lb.Next() != "2" {
+               t.Fatalf("TestNewRoundRobinLB failed")
+       }
+       if lb.Next() != "1" {
+               t.Fatalf("TestNewRoundRobinLB failed")
+       }
+}
+
+func BenchmarkNewRoundLB(b *testing.B) {
+       lb := NewRoundRobinLB([]string{"1", "2", "3"})
+       b.RunParallel(func(pb *testing.PB) {
+               for pb.Next() {
+                       _ = lb.Next()
+               }
+       })
 }
diff --git a/pkg/client/sc/client.go b/pkg/lb/roundrobin.go
similarity index 56%
copy from pkg/client/sc/client.go
copy to pkg/lb/roundrobin.go
index 0d95dcd..f842a79 100644
--- a/pkg/client/sc/client.go
+++ b/pkg/lb/roundrobin.go
@@ -13,21 +13,35 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package sc
+package lb
 
-import (
-       "github.com/apache/incubator-servicecomb-service-center/pkg/rest"
-)
+import "sync/atomic"
 
-func NewSCClient(cfg Config) (*SCClient, error) {
-       client, err := rest.GetURLClient(cfg.Merge())
-       if err != nil {
-               return nil, err
+type RoundRobinLB struct {
+       Endpoints []string
+       index     int32
+}
+
+func (lb *RoundRobinLB) Next() string {
+       l := len(lb.Endpoints)
+       if l == 0 {
+               return ""
        }
-       return &SCClient{Config: cfg, URLClient: client}, nil
+       c := atomic.LoadInt32(&lb.index)
+       if c >= int32(l)-1 {
+               atomic.StoreInt32(&lb.index, 0)
+               return lb.Endpoints[0]
+       } else if atomic.CompareAndSwapInt32(&lb.index, c, c+1) {
+               return lb.Endpoints[c+1]
+       }
+       return lb.Endpoints[atomic.LoadInt32(&lb.index)]
 }
 
-type SCClient struct {
-       *rest.URLClient
-       Config Config
+func NewRoundRobinLB(endpoints []string) *RoundRobinLB {
+       lb := &RoundRobinLB{
+               Endpoints: make([]string, len(endpoints)),
+               index:     -1,
+       }
+       copy(lb.Endpoints, endpoints)
+       return lb
 }
diff --git a/pkg/rest/client.go b/pkg/rest/client.go
index 0518b6e..ac84929 100644
--- a/pkg/rest/client.go
+++ b/pkg/rest/client.go
@@ -61,8 +61,6 @@ type URLClient struct {
 
        TLS *tls.Config
 
-       Request *http.Request
-
        Cfg URLClientOption
 }
 
@@ -95,19 +93,17 @@ func (client *URLClient) HttpDo(method string, rawURL 
string, headers http.Heade
        if err != nil {
                return nil, errors.New(fmt.Sprintf("create request failed: %s", 
err.Error()))
        }
-       client.Request = req
-
        req.Header = headers
 
        resp, err = client.Client.Do(req)
        if err != nil {
-               return nil, errors.New(fmt.Sprintf("invoke request failed: %s", 
err.Error()))
+               return nil, err
        }
 
        if os.Getenv("DEBUG_MODE") == "1" {
                fmt.Println("--- BEGIN ---")
-               fmt.Printf("> %s %s %s\n", client.Request.Method, 
client.Request.URL.RequestURI(), client.Request.Proto)
-               for key, header := range client.Request.Header {
+               fmt.Printf("> %s %s %s\n", req.Method, req.URL.RequestURI(), 
req.Proto)
+               for key, header := range req.Header {
                        for _, value := range header {
                                fmt.Printf("> %s: %s\n", key, value)
                        }
diff --git a/scctl/pkg/cmd/cmd.go b/scctl/pkg/cmd/cmd.go
index f84dd23..0437ae1 100644
--- a/scctl/pkg/cmd/cmd.go
+++ b/scctl/pkg/cmd/cmd.go
@@ -48,8 +48,8 @@ func init() {
                }
        }
 
-       rootCmd.PersistentFlags().StringVar(&ScClientConfig.Addr, "addr",
-               "http://"+util.GetEnvString("HOSTING_SERVER_IP", 
"127.0.0.1")+":30100",
+       rootCmd.PersistentFlags().StringSliceVar(&ScClientConfig.Endpoints, 
"addr",
+               []string{"http://"; + util.GetEnvString("HOSTING_SERVER_IP", 
"127.0.0.1") + ":30100"},
                "the http host and port of service center, can be overrode by 
env HOSTING_SERVER_IP.")
 
        rootCmd.PersistentFlags().StringVar(&ScClientConfig.Token, "token", "",
diff --git a/server/plugin/pkg/discovery/servicecenter/aggregate.go 
b/server/plugin/pkg/discovery/servicecenter/aggregate.go
index fb3e818..6c654e5 100644
--- a/server/plugin/pkg/discovery/servicecenter/aggregate.go
+++ b/server/plugin/pkg/discovery/servicecenter/aggregate.go
@@ -45,7 +45,7 @@ func (c *SCClientAggregate) GetScCache() (*model.Cache, 
error) {
        for _, client := range *c {
                cache, err := client.GetScCache()
                if err != nil {
-                       log.Errorf(err, "get service center[%s] cache failed", 
client.Config.Addr)
+                       log.Errorf(err, "get service center cache failed")
                        continue
                }
                caches.Microservices = append(caches.Microservices, 
cache.Microservices...)
@@ -93,27 +93,27 @@ func (c *SCClientAggregate) 
GetSchemaBySchemaId(domainProject, serviceId, schema
 func NewSCClientAggregate() *SCClientAggregate {
        c := &SCClientAggregate{}
        clusters := registry.Configuration().Clusters
-       for name, addr := range clusters {
+       for name, endpoints := range clusters {
                if len(name) == 0 || name == 
registry.Configuration().ClusterName {
                        continue
                }
-               // TODO support endpoints LB
-               client, err := sc.NewSCClient(sc.Config{Addr: addr[0]})
+               client, err := sc.NewSCClient(sc.Config{Endpoints: endpoints})
                if err != nil {
-                       log.Errorf(err, "new service center[%s][%s] client 
failed", name, addr)
+                       log.Errorf(err, "new service center[%s]%v client 
failed", name, endpoints)
                        continue
                }
                client.Timeout = registry.Configuration().RequestTimeOut
-               if strings.Index(addr[0], "https") >= 0 {
+               // TLS
+               if strings.Index(endpoints[0], "https") >= 0 {
                        client.TLS, err = getClientTLS()
                        if err != nil {
-                               log.Errorf(err, "get service center[%s][%s] tls 
config failed", name, addr)
+                               log.Errorf(err, "get service center[%s]%v tls 
config failed", name, endpoints)
                                continue
                        }
                }
 
                *c = append(*c, client)
-               log.Infof("new service center[%s][%s] client", name, addr)
+               log.Infof("new service center[%s]%v client", name, endpoints)
        }
        return c
 }

Reply via email to