This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new a4b8928  Support load-balance with static server list (#68)
a4b8928 is described below

commit a4b892807dd5686c87b47f47accf895096c55e01
Author: mrproliu <[email protected]>
AuthorDate: Wed Sep 22 23:47:14 2021 +0800

    Support load-balance with static server list (#68)
---
 docs/en/setup/plugins/client_grpc-client.md     |   2 +-
 plugins/client/grpc/client.go                   |  15 ++-
 plugins/client/grpc/client_config.go            |   3 +
 plugins/client/grpc/resolvers/resolvers.go      |  57 +++++++++
 plugins/client/grpc/resolvers/static_clients.go |  77 ++++++++++++
 plugins/client/grpc/static_clients_test.go      | 154 ++++++++++++++++++++++++
 6 files changed, 304 insertions(+), 4 deletions(-)

diff --git a/docs/en/setup/plugins/client_grpc-client.md 
b/docs/en/setup/plugins/client_grpc-client.md
index 27ada86..5f2d9ea 100755
--- a/docs/en/setup/plugins/client_grpc-client.md
+++ b/docs/en/setup/plugins/client_grpc-client.md
@@ -3,7 +3,7 @@
 The gRPC client is a sharing plugin to keep connection with the gRPC server 
and delivery the data to it.
 ## DefaultConfig
 ```yaml
-# The gRPC server address (default localhost:11800). 
+# The gRPC server address (default localhost:11800), multiple addresses are 
split by ",".
 server_addr: localhost:11800
 
 # The TLS switch (default false).
diff --git a/plugins/client/grpc/client.go b/plugins/client/grpc/client.go
index b8598c9..71d1b47 100644
--- a/plugins/client/grpc/client.go
+++ b/plugins/client/grpc/client.go
@@ -28,14 +28,16 @@ import (
        "github.com/apache/skywalking-satellite/internal/pkg/config"
        "github.com/apache/skywalking-satellite/internal/pkg/log"
        "github.com/apache/skywalking-satellite/plugins/client/api"
+       "github.com/apache/skywalking-satellite/plugins/client/grpc/resolvers"
 )
 
 const Name = "grpc-client"
 
 type Client struct {
        config.CommonFields
+       // server finder config
+       ServerFinderConfig resolvers.ServerFinderConfig `mapstructure:",squash"`
        // config
-       ServerAddr         string `mapstructure:"server_addr"`          // The 
gRPC server address
        EnableTLS          bool   `mapstructure:"enable_TLS"`           // 
Enable TLS connect to server
        ClientPemPath      string `mapstructure:"client_pem_path"`      // The 
file path of client.pem. The config only works when opening the TLS switch.
        ClientKeyPath      string `mapstructure:"client_key_path"`      // The 
file path of client.key. The config only works when opening the TLS switch.
@@ -62,7 +64,7 @@ func (c *Client) Description() string {
 
 func (c *Client) DefaultConfig() string {
        return `
-# The gRPC server address (default localhost:11800). 
+# The gRPC server address (default localhost:11800), multiple addresses are 
split by ",".
 server_addr: localhost:11800
 
 # The TLS switch (default false).
@@ -100,8 +102,15 @@ func (c *Client) Prepare() error {
                "client_name": Name,
        })})
 
+       // server address resolver
+       resolvers.RegisterAllGrpcResolvers()
+
        // connect to server
-       client, err := grpc.Dial(c.ServerAddr, *cfg...)
+       target, err := resolvers.BuildTarget(&c.ServerFinderConfig)
+       if err != nil {
+               return fmt.Errorf("cannot build grpc target: %v", err)
+       }
+       client, err := grpc.Dial(target, *cfg...)
        if err != nil {
                return fmt.Errorf("cannot connect to grpc server: %v", err)
        }
diff --git a/plugins/client/grpc/client_config.go 
b/plugins/client/grpc/client_config.go
index 5f7d49b..2b767fe 100644
--- a/plugins/client/grpc/client_config.go
+++ b/plugins/client/grpc/client_config.go
@@ -73,6 +73,9 @@ func (c *Client) loadConfig() (*[]grpc.DialOption, error) {
                return err
        })
 
+       // using round-robin load balancer
+       options = append(options, 
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`))
+
        return &options, nil
 }
 
diff --git a/plugins/client/grpc/resolvers/resolvers.go 
b/plugins/client/grpc/resolvers/resolvers.go
new file mode 100644
index 0000000..9974d31
--- /dev/null
+++ b/plugins/client/grpc/resolvers/resolvers.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package resolvers
+
+import (
+       "fmt"
+
+       "google.golang.org/grpc/resolver"
+)
+
+// all customized resolvers
+var rs = []GrpcResolver{
+       &staticServerResolver{},
+}
+
+type ServerFinderConfig struct {
+       ServerAddr string `mapstructure:"server_addr"` // The gRPC server 
address
+}
+
+type GrpcResolver interface {
+       resolver.Builder
+
+       // IsSupport client config
+       IsSupport(c *ServerFinderConfig) bool
+       // BuildTarget address by client config
+       BuildTarget(c *ServerFinderConfig) (string, error)
+}
+
+func RegisterAllGrpcResolvers() {
+       for _, r := range rs {
+               resolver.Register(r)
+       }
+}
+
+func BuildTarget(client *ServerFinderConfig) (string, error) {
+       for _, r := range rs {
+               if r.IsSupport(client) {
+                       return r.BuildTarget(client)
+               }
+       }
+       return "", fmt.Errorf("could not build grpc target")
+}
diff --git a/plugins/client/grpc/resolvers/static_clients.go 
b/plugins/client/grpc/resolvers/static_clients.go
new file mode 100644
index 0000000..16987fb
--- /dev/null
+++ b/plugins/client/grpc/resolvers/static_clients.go
@@ -0,0 +1,77 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package resolvers
+
+import (
+       "fmt"
+       "strings"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+
+       "google.golang.org/grpc/resolver"
+)
+
+var staticServerSchema = "static"
+
+type staticServerResolver struct {
+}
+
+func (s *staticServerResolver) IsSupport(c *ServerFinderConfig) bool {
+       return c.ServerAddr != ""
+}
+
+func (s *staticServerResolver) BuildTarget(c *ServerFinderConfig) (string, 
error) {
+       // build target using uri endpoint
+       return fmt.Sprintf("%s:///%s", staticServerSchema, c.ServerAddr), nil
+}
+
+func (*staticServerResolver) Build(target resolver.Target, cc 
resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
+       r := &staticResolver{
+               target: target,
+               cc:     cc,
+       }
+       r.analyzeClients()
+       return r, nil
+}
+
+func (*staticServerResolver) Scheme() string {
+       return staticServerSchema
+}
+
+type staticResolver struct {
+       target resolver.Target
+       cc     resolver.ClientConn
+}
+
+func (r *staticResolver) ResolveNow(o resolver.ResolveNowOptions) {
+       r.analyzeClients()
+}
+
+func (*staticResolver) Close() {
+}
+
+func (r *staticResolver) analyzeClients() {
+       addresses := strings.Split(r.target.Endpoint, ",")
+       addrs := make([]resolver.Address, len(addresses))
+       for i, s := range addresses {
+               addrs[i] = resolver.Address{Addr: s}
+       }
+       if err := r.cc.UpdateState(resolver.State{Addresses: addrs}); err != 
nil {
+               log.Logger.Warnf("error update static grpc client list: %v", 
err)
+       }
+}
diff --git a/plugins/client/grpc/static_clients_test.go 
b/plugins/client/grpc/static_clients_test.go
new file mode 100644
index 0000000..ba5a6d6
--- /dev/null
+++ b/plugins/client/grpc/static_clients_test.go
@@ -0,0 +1,154 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "crypto/rand"
+       "fmt"
+       "math/big"
+       "reflect"
+       "testing"
+       "time"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+
+       common "skywalking.apache.org/repo/goapi/collect/common/v3"
+       agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+       "google.golang.org/grpc"
+
+       client "github.com/apache/skywalking-satellite/plugins/client/api"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+       receiver "github.com/apache/skywalking-satellite/plugins/receiver/api"
+       server "github.com/apache/skywalking-satellite/plugins/server/api"
+       grpc_server "github.com/apache/skywalking-satellite/plugins/server/grpc"
+)
+
+type JVMReportService struct {
+       receiveCount int
+       agent.UnimplementedJVMMetricReportServiceServer
+}
+
+func (j *JVMReportService) Collect(_ context.Context, jvm 
*agent.JVMMetricCollection) (*common.Commands, error) {
+       j.receiveCount++
+       return &common.Commands{}, nil
+}
+
+func TestStaticServer(t *testing.T) {
+       serverCount := 2
+       Init()
+
+       // init all servers
+       servers, ports := initServers(serverCount, t)
+       receivers := make([]*JVMReportService, serverCount)
+       for inx, s := range servers {
+               reportService := &JVMReportService{receiveCount: 0}
+               receivers[inx] = reportService
+               
agent.RegisterJVMMetricReportServiceServer(s.GetServer().(*grpc.Server), 
reportService)
+
+               if err := s.Start(); err != nil {
+                       t.Errorf("start client error: %v", err)
+               }
+       }
+       defer func() {
+               for _, s := range servers {
+                       s.Close()
+               }
+       }()
+
+       // init client
+       c := initClient(ports, t)
+
+       // wait all channel being connected (connect by async)
+       time.Sleep(time.Second * 1)
+
+       // send request
+       jvmClient := 
agent.NewJVMMetricReportServiceClient(c.GetConnectedClient().(*grpc.ClientConn))
+       for inx := 0; inx < serverCount; inx++ {
+               if _, err := jvmClient.Collect(context.Background(), 
&agent.JVMMetricCollection{}); err != nil {
+                       t.Errorf("send request error: %v", err)
+               }
+       }
+
+       // check all receiver must have received data
+       for inx, receiver := range receivers {
+               if receiver.receiveCount <= 0 {
+                       t.Errorf("check result failed, client index: %d", inx)
+               }
+       }
+}
+
+func Init() {
+       log.Init(new(log.LoggerConfig))
+       
plugin.RegisterPluginCategory(reflect.TypeOf((*server.Server)(nil)).Elem())
+       
plugin.RegisterPluginCategory(reflect.TypeOf((*client.Client)(nil)).Elem())
+       
plugin.RegisterPluginCategory(reflect.TypeOf((*receiver.Receiver)(nil)).Elem())
+       plugin.RegisterPlugin(new(grpc_server.Server))
+       plugin.RegisterPlugin(new(Client))
+}
+
+func initServers(serverCount int, t *testing.T) (servers []server.Server, 
ports []int) {
+       for inx := 0; inx < serverCount; inx++ {
+               cfg := make(plugin.Config)
+               cfg[plugin.NameField] = grpc_server.Name
+               port := randomGrpcPort()
+               cfg["address"] = fmt.Sprintf(":%d", port)
+               q := server.GetServer(cfg)
+               if err := q.Prepare(); err != nil {
+                       t.Fatalf("cannot perpare the grpc server: %v", err)
+               }
+
+               servers = append(servers, q)
+               ports = append(ports, port)
+       }
+
+       return servers, ports
+}
+
+func randomGrpcPort() int {
+       b := new(big.Int).SetInt64(int64(65535 - 1000))
+       i, err := rand.Int(rand.Reader, b)
+       if err != nil {
+               fmt.Printf("Can't generate random value: %v, %v", i, err)
+               return -1
+       }
+       return int(i.Int64() + 1000)
+}
+
+func initClient(ports []int, t *testing.T) client.Client {
+       cfg := make(plugin.Config)
+       cfg[plugin.NameField] = Name
+       serverList := ""
+       for inx := range ports {
+               if inx > 0 {
+                       serverList += ","
+               }
+               serverList += fmt.Sprintf("%s%d", "0.0.0.0:", ports[inx])
+       }
+       cfg["server_addr"] = serverList
+       q := client.GetClient(cfg)
+       if err := q.Prepare(); err != nil {
+               t.Errorf("prepare client error: %v", err)
+       }
+       if err := q.Start(); err != nil {
+               t.Errorf("start client error: %v", err)
+       }
+       return q
+}

Reply via email to