This is an automated email from the ASF dual-hosted git repository. asifdxtreme pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push: new dc76a75 SCB-988 Add loadbalancer in sc client (#479) dc76a75 is described below commit dc76a7515fa4518b3bf2a75327661e1bb97fa3c4 Author: little-cui <sure_0...@qq.com> AuthorDate: Sat Nov 3 00:16:48 2018 +0800 SCB-988 Add loadbalancer in sc client (#479) --- pkg/client/sc/apis.go | 34 ++++++-------- pkg/client/sc/client.go | 18 ++++++-- pkg/client/sc/{client.go => client_lb.go} | 34 ++++++++++++-- pkg/client/sc/config.go | 4 +- pkg/{client/sc/client.go => lb/loadbalancer.go} | 19 ++------ .../sc/config.go => lb/loadbalancer_test.go} | 53 +++++++++++++--------- pkg/{client/sc/client.go => lb/roundrobin.go} | 38 +++++++++++----- pkg/rest/client.go | 10 ++-- scctl/pkg/cmd/cmd.go | 4 +- .../pkg/discovery/servicecenter/aggregate.go | 16 +++---- 10 files changed, 130 insertions(+), 100 deletions(-) diff --git a/pkg/client/sc/apis.go b/pkg/client/sc/apis.go index 63e96e7..2947297 100644 --- a/pkg/client/sc/apis.go +++ b/pkg/client/sc/apis.go @@ -47,16 +47,8 @@ func (c *SCClient) toError(body []byte) *scerr.Error { return message } -func (c *SCClient) commonHeaders() http.Header { - var headers = make(http.Header) - if len(c.Config.Token) > 0 { - headers.Set("X-Auth-Token", c.Config.Token) - } - return headers -} - func (c *SCClient) GetScVersion() (*version.VersionSet, *scerr.Error) { - resp, err := c.URLClient.HttpDo(http.MethodGet, c.Config.Addr+apiVersionURL, c.commonHeaders(), nil) + resp, err := c.RestDo(http.MethodGet, apiVersionURL, c.CommonHeaders(), nil) if err != nil { return nil, scerr.NewError(scerr.ErrInternal, err.Error()) } @@ -82,10 +74,10 @@ func (c *SCClient) GetScVersion() (*version.VersionSet, *scerr.Error) { } func (c *SCClient) GetScCache() (*model.Cache, *scerr.Error) { - headers := c.commonHeaders() + headers := c.CommonHeaders() // only default domain has admin permission headers.Set("X-Domain-Name", "default") - resp, err := c.URLClient.HttpDo(http.MethodGet, c.Config.Addr+apiDumpURL, headers, nil) + resp, err := c.RestDo(http.MethodGet, apiDumpURL, headers, nil) if err != nil { return nil, scerr.NewError(scerr.ErrInternal, err.Error()) } @@ -112,10 +104,10 @@ func (c *SCClient) GetScCache() (*model.Cache, *scerr.Error) { func (c *SCClient) GetSchemasByServiceId(domainProject, serviceId string) ([]*pb.Schema, *scerr.Error) { domain, project := core.FromDomainProject(domainProject) - headers := c.commonHeaders() + headers := c.CommonHeaders() headers.Set("X-Domain-Name", domain) - resp, err := c.URLClient.HttpDo(http.MethodGet, - c.Config.Addr+fmt.Sprintf(apiSchemasURL, project, serviceId)+"?withSchema=1", + resp, err := c.RestDo(http.MethodGet, + fmt.Sprintf(apiSchemasURL, project, serviceId)+"?withSchema=1", headers, nil) if err != nil { return nil, scerr.NewError(scerr.ErrInternal, err.Error()) @@ -143,10 +135,10 @@ func (c *SCClient) GetSchemasByServiceId(domainProject, serviceId string) ([]*pb func (c *SCClient) GetSchemaBySchemaId(domainProject, serviceId, schemaId string) (*pb.Schema, *scerr.Error) { domain, project := core.FromDomainProject(domainProject) - headers := c.commonHeaders() + headers := c.CommonHeaders() headers.Set("X-Domain-Name", domain) - resp, err := c.URLClient.HttpDo(http.MethodGet, - c.Config.Addr+fmt.Sprintf(apiSchemaURL, project, serviceId, schemaId), + resp, err := c.RestDo(http.MethodGet, + fmt.Sprintf(apiSchemaURL, project, serviceId, schemaId), headers, nil) if err != nil { return nil, scerr.NewError(scerr.ErrInternal, err.Error()) @@ -177,10 +169,10 @@ func (c *SCClient) GetSchemaBySchemaId(domainProject, serviceId, schemaId string } func (c *SCClient) GetClusters() (registry.Clusters, *scerr.Error) { - headers := c.commonHeaders() + headers := c.CommonHeaders() // only default domain has admin permission headers.Set("X-Domain-Name", "default") - resp, err := c.URLClient.HttpDo(http.MethodGet, c.Config.Addr+apiClustersURL, headers, nil) + resp, err := c.RestDo(http.MethodGet, apiClustersURL, headers, nil) if err != nil { return nil, scerr.NewError(scerr.ErrInternal, err.Error()) } @@ -206,10 +198,10 @@ func (c *SCClient) GetClusters() (registry.Clusters, *scerr.Error) { } func (c *SCClient) HealthCheck() *scerr.Error { - headers := c.commonHeaders() + headers := c.CommonHeaders() // only default domain has admin permission headers.Set("X-Domain-Name", "default") - resp, err := c.URLClient.HttpDo(http.MethodGet, c.Config.Addr+apiHealthURL, headers, nil) + resp, err := c.RestDo(http.MethodGet, apiHealthURL, headers, nil) if err != nil { return scerr.NewError(scerr.ErrUnavailableBackend, err.Error()) } diff --git a/pkg/client/sc/client.go b/pkg/client/sc/client.go index 0d95dcd..38ab11b 100644 --- a/pkg/client/sc/client.go +++ b/pkg/client/sc/client.go @@ -16,18 +16,26 @@ package sc import ( - "github.com/apache/incubator-servicecomb-service-center/pkg/rest" + "net/http" ) func NewSCClient(cfg Config) (*SCClient, error) { - client, err := rest.GetURLClient(cfg.Merge()) + client, err := NewLBClient(cfg.Endpoints, cfg.Merge()) if err != nil { return nil, err } - return &SCClient{Config: cfg, URLClient: client}, nil + return &SCClient{LBClient: client, Token: cfg.Token}, nil } type SCClient struct { - *rest.URLClient - Config Config + *LBClient + Token string +} + +func (c *SCClient) CommonHeaders() http.Header { + var headers = make(http.Header) + if len(c.Token) > 0 { + headers.Set("X-Auth-Token", c.Token) + } + return headers } diff --git a/pkg/client/sc/client.go b/pkg/client/sc/client_lb.go similarity index 53% copy from pkg/client/sc/client.go copy to pkg/client/sc/client_lb.go index 0d95dcd..015307d 100644 --- a/pkg/client/sc/client.go +++ b/pkg/client/sc/client_lb.go @@ -16,18 +16,42 @@ package sc import ( + "github.com/apache/incubator-servicecomb-service-center/pkg/lb" "github.com/apache/incubator-servicecomb-service-center/pkg/rest" + "github.com/apache/incubator-servicecomb-service-center/pkg/util" + "net/http" ) -func NewSCClient(cfg Config) (*SCClient, error) { - client, err := rest.GetURLClient(cfg.Merge()) +func NewLBClient(endpoints []string, options rest.URLClientOption) (*LBClient, error) { + client, err := rest.GetURLClient(options) if err != nil { return nil, err } - return &SCClient{Config: cfg, URLClient: client}, nil + return &LBClient{ + Retries: len(endpoints), + LB: lb.NewRoundRobinLB(endpoints), + URLClient: client, + }, nil } -type SCClient struct { +type LBClient struct { *rest.URLClient - Config Config + Retries int + LB lb.LoadBalancer +} + +func (c *LBClient) Next() string { + return c.LB.Next() +} + +func (c *LBClient) RestDo(method string, api string, headers http.Header, body []byte) (resp *http.Response, err error) { + for i := 0; i < c.Retries; i++ { + resp, err = c.HttpDo(method, c.Next()+api, headers, body) + if err != nil { + util.GetBackoff().Delay(i) + continue + } + break + } + return } diff --git a/pkg/client/sc/config.go b/pkg/client/sc/config.go index 0f8d64f..f78845b 100644 --- a/pkg/client/sc/config.go +++ b/pkg/client/sc/config.go @@ -24,14 +24,14 @@ import ( type Config struct { rest.URLClientOption - Addr string + Endpoints []string // TODO Expandable header not only token header Token string CertKeyPWDPath string } func (cfg *Config) Merge() rest.URLClientOption { - ssl := strings.Index(cfg.Addr, "https://") >= 0 + ssl := strings.Index(cfg.Endpoints[0], "https://") >= 0 if ssl && len(cfg.CertKeyPWD) == 0 && len(cfg.CertKeyPWDPath) > 0 { content, _ := ioutil.ReadFile(cfg.CertKeyPWDPath) cfg.CertKeyPWD = string(content) diff --git a/pkg/client/sc/client.go b/pkg/lb/loadbalancer.go similarity index 70% copy from pkg/client/sc/client.go copy to pkg/lb/loadbalancer.go index 0d95dcd..c1ee278 100644 --- a/pkg/client/sc/client.go +++ b/pkg/lb/loadbalancer.go @@ -13,21 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -package sc +package lb -import ( - "github.com/apache/incubator-servicecomb-service-center/pkg/rest" -) - -func NewSCClient(cfg Config) (*SCClient, error) { - client, err := rest.GetURLClient(cfg.Merge()) - if err != nil { - return nil, err - } - return &SCClient{Config: cfg, URLClient: client}, nil -} - -type SCClient struct { - *rest.URLClient - Config Config +type LoadBalancer interface { + Next() string } diff --git a/pkg/client/sc/config.go b/pkg/lb/loadbalancer_test.go similarity index 52% copy from pkg/client/sc/config.go copy to pkg/lb/loadbalancer_test.go index 0f8d64f..c221c6c 100644 --- a/pkg/client/sc/config.go +++ b/pkg/lb/loadbalancer_test.go @@ -13,32 +13,41 @@ // See the License for the specific language governing permissions and // limitations under the License. -package sc +package lb import ( - "github.com/apache/incubator-servicecomb-service-center/pkg/rest" - "io/ioutil" - "strings" - "time" + "testing" ) -type Config struct { - rest.URLClientOption - Addr string - // TODO Expandable header not only token header - Token string - CertKeyPWDPath string -} - -func (cfg *Config) Merge() rest.URLClientOption { - ssl := strings.Index(cfg.Addr, "https://") >= 0 - if ssl && len(cfg.CertKeyPWD) == 0 && len(cfg.CertKeyPWDPath) > 0 { - content, _ := ioutil.ReadFile(cfg.CertKeyPWDPath) - cfg.CertKeyPWD = string(content) +func TestNewRoundRobinLB(t *testing.T) { + lb := NewRoundRobinLB(nil) + if lb.Next() != "" { + t.Fatalf("TestNewRoundRobinLB failed") + } + lb = NewRoundRobinLB([]string{"1"}) + if lb.Next() != "1" { + t.Fatalf("TestNewRoundRobinLB failed") + } + if lb.Next() != "1" { + t.Fatalf("TestNewRoundRobinLB failed") } - cfg.SSLEnabled = ssl - if cfg.RequestTimeout == 0 { - cfg.RequestTimeout = 10 * time.Second + lb = NewRoundRobinLB([]string{"1", "2"}) + if lb.Next() != "1" { + t.Fatalf("TestNewRoundRobinLB failed") } - return cfg.URLClientOption + if lb.Next() != "2" { + t.Fatalf("TestNewRoundRobinLB failed") + } + if lb.Next() != "1" { + t.Fatalf("TestNewRoundRobinLB failed") + } +} + +func BenchmarkNewRoundLB(b *testing.B) { + lb := NewRoundRobinLB([]string{"1", "2", "3"}) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _ = lb.Next() + } + }) } diff --git a/pkg/client/sc/client.go b/pkg/lb/roundrobin.go similarity index 56% copy from pkg/client/sc/client.go copy to pkg/lb/roundrobin.go index 0d95dcd..f842a79 100644 --- a/pkg/client/sc/client.go +++ b/pkg/lb/roundrobin.go @@ -13,21 +13,35 @@ // See the License for the specific language governing permissions and // limitations under the License. -package sc +package lb -import ( - "github.com/apache/incubator-servicecomb-service-center/pkg/rest" -) +import "sync/atomic" -func NewSCClient(cfg Config) (*SCClient, error) { - client, err := rest.GetURLClient(cfg.Merge()) - if err != nil { - return nil, err +type RoundRobinLB struct { + Endpoints []string + index int32 +} + +func (lb *RoundRobinLB) Next() string { + l := len(lb.Endpoints) + if l == 0 { + return "" } - return &SCClient{Config: cfg, URLClient: client}, nil + c := atomic.LoadInt32(&lb.index) + if c >= int32(l)-1 { + atomic.StoreInt32(&lb.index, 0) + return lb.Endpoints[0] + } else if atomic.CompareAndSwapInt32(&lb.index, c, c+1) { + return lb.Endpoints[c+1] + } + return lb.Endpoints[atomic.LoadInt32(&lb.index)] } -type SCClient struct { - *rest.URLClient - Config Config +func NewRoundRobinLB(endpoints []string) *RoundRobinLB { + lb := &RoundRobinLB{ + Endpoints: make([]string, len(endpoints)), + index: -1, + } + copy(lb.Endpoints, endpoints) + return lb } diff --git a/pkg/rest/client.go b/pkg/rest/client.go index 0518b6e..ac84929 100644 --- a/pkg/rest/client.go +++ b/pkg/rest/client.go @@ -61,8 +61,6 @@ type URLClient struct { TLS *tls.Config - Request *http.Request - Cfg URLClientOption } @@ -95,19 +93,17 @@ func (client *URLClient) HttpDo(method string, rawURL string, headers http.Heade if err != nil { return nil, errors.New(fmt.Sprintf("create request failed: %s", err.Error())) } - client.Request = req - req.Header = headers resp, err = client.Client.Do(req) if err != nil { - return nil, errors.New(fmt.Sprintf("invoke request failed: %s", err.Error())) + return nil, err } if os.Getenv("DEBUG_MODE") == "1" { fmt.Println("--- BEGIN ---") - fmt.Printf("> %s %s %s\n", client.Request.Method, client.Request.URL.RequestURI(), client.Request.Proto) - for key, header := range client.Request.Header { + fmt.Printf("> %s %s %s\n", req.Method, req.URL.RequestURI(), req.Proto) + for key, header := range req.Header { for _, value := range header { fmt.Printf("> %s: %s\n", key, value) } diff --git a/scctl/pkg/cmd/cmd.go b/scctl/pkg/cmd/cmd.go index f84dd23..0437ae1 100644 --- a/scctl/pkg/cmd/cmd.go +++ b/scctl/pkg/cmd/cmd.go @@ -48,8 +48,8 @@ func init() { } } - rootCmd.PersistentFlags().StringVar(&ScClientConfig.Addr, "addr", - "http://"+util.GetEnvString("HOSTING_SERVER_IP", "127.0.0.1")+":30100", + rootCmd.PersistentFlags().StringSliceVar(&ScClientConfig.Endpoints, "addr", + []string{"http://" + util.GetEnvString("HOSTING_SERVER_IP", "127.0.0.1") + ":30100"}, "the http host and port of service center, can be overrode by env HOSTING_SERVER_IP.") rootCmd.PersistentFlags().StringVar(&ScClientConfig.Token, "token", "", diff --git a/server/plugin/pkg/discovery/servicecenter/aggregate.go b/server/plugin/pkg/discovery/servicecenter/aggregate.go index fb3e818..6c654e5 100644 --- a/server/plugin/pkg/discovery/servicecenter/aggregate.go +++ b/server/plugin/pkg/discovery/servicecenter/aggregate.go @@ -45,7 +45,7 @@ func (c *SCClientAggregate) GetScCache() (*model.Cache, error) { for _, client := range *c { cache, err := client.GetScCache() if err != nil { - log.Errorf(err, "get service center[%s] cache failed", client.Config.Addr) + log.Errorf(err, "get service center cache failed") continue } caches.Microservices = append(caches.Microservices, cache.Microservices...) @@ -93,27 +93,27 @@ func (c *SCClientAggregate) GetSchemaBySchemaId(domainProject, serviceId, schema func NewSCClientAggregate() *SCClientAggregate { c := &SCClientAggregate{} clusters := registry.Configuration().Clusters - for name, addr := range clusters { + for name, endpoints := range clusters { if len(name) == 0 || name == registry.Configuration().ClusterName { continue } - // TODO support endpoints LB - client, err := sc.NewSCClient(sc.Config{Addr: addr[0]}) + client, err := sc.NewSCClient(sc.Config{Endpoints: endpoints}) if err != nil { - log.Errorf(err, "new service center[%s][%s] client failed", name, addr) + log.Errorf(err, "new service center[%s]%v client failed", name, endpoints) continue } client.Timeout = registry.Configuration().RequestTimeOut - if strings.Index(addr[0], "https") >= 0 { + // TLS + if strings.Index(endpoints[0], "https") >= 0 { client.TLS, err = getClientTLS() if err != nil { - log.Errorf(err, "get service center[%s][%s] tls config failed", name, addr) + log.Errorf(err, "get service center[%s]%v tls config failed", name, endpoints) continue } } *c = append(*c, client) - log.Infof("new service center[%s][%s] client", name, addr) + log.Infof("new service center[%s]%v client", name, endpoints) } return c }