This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new a4b8928 Support load-balance with static server list (#68)
a4b8928 is described below
commit a4b892807dd5686c87b47f47accf895096c55e01
Author: mrproliu <[email protected]>
AuthorDate: Wed Sep 22 23:47:14 2021 +0800
Support load-balance with static server list (#68)
---
docs/en/setup/plugins/client_grpc-client.md | 2 +-
plugins/client/grpc/client.go | 15 ++-
plugins/client/grpc/client_config.go | 3 +
plugins/client/grpc/resolvers/resolvers.go | 57 +++++++++
plugins/client/grpc/resolvers/static_clients.go | 77 ++++++++++++
plugins/client/grpc/static_clients_test.go | 154 ++++++++++++++++++++++++
6 files changed, 304 insertions(+), 4 deletions(-)
diff --git a/docs/en/setup/plugins/client_grpc-client.md
b/docs/en/setup/plugins/client_grpc-client.md
index 27ada86..5f2d9ea 100755
--- a/docs/en/setup/plugins/client_grpc-client.md
+++ b/docs/en/setup/plugins/client_grpc-client.md
@@ -3,7 +3,7 @@
The gRPC client is a sharing plugin to keep connection with the gRPC server
and delivery the data to it.
## DefaultConfig
```yaml
-# The gRPC server address (default localhost:11800).
+# The gRPC server address (default localhost:11800), multiple addresses are
split by ",".
server_addr: localhost:11800
# The TLS switch (default false).
diff --git a/plugins/client/grpc/client.go b/plugins/client/grpc/client.go
index b8598c9..71d1b47 100644
--- a/plugins/client/grpc/client.go
+++ b/plugins/client/grpc/client.go
@@ -28,14 +28,16 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/config"
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/plugins/client/api"
+ "github.com/apache/skywalking-satellite/plugins/client/grpc/resolvers"
)
const Name = "grpc-client"
type Client struct {
config.CommonFields
+ // server finder config
+ ServerFinderConfig resolvers.ServerFinderConfig `mapstructure:",squash"`
// config
- ServerAddr string `mapstructure:"server_addr"` // The
gRPC server address
EnableTLS bool `mapstructure:"enable_TLS"` //
Enable TLS connect to server
ClientPemPath string `mapstructure:"client_pem_path"` // The
file path of client.pem. The config only works when opening the TLS switch.
ClientKeyPath string `mapstructure:"client_key_path"` // The
file path of client.key. The config only works when opening the TLS switch.
@@ -62,7 +64,7 @@ func (c *Client) Description() string {
func (c *Client) DefaultConfig() string {
return `
-# The gRPC server address (default localhost:11800).
+# The gRPC server address (default localhost:11800), multiple addresses are
split by ",".
server_addr: localhost:11800
# The TLS switch (default false).
@@ -100,8 +102,15 @@ func (c *Client) Prepare() error {
"client_name": Name,
})})
+ // server address resolver
+ resolvers.RegisterAllGrpcResolvers()
+
// connect to server
- client, err := grpc.Dial(c.ServerAddr, *cfg...)
+ target, err := resolvers.BuildTarget(&c.ServerFinderConfig)
+ if err != nil {
+ return fmt.Errorf("cannot build grpc target: %v", err)
+ }
+ client, err := grpc.Dial(target, *cfg...)
if err != nil {
return fmt.Errorf("cannot connect to grpc server: %v", err)
}
diff --git a/plugins/client/grpc/client_config.go
b/plugins/client/grpc/client_config.go
index 5f7d49b..2b767fe 100644
--- a/plugins/client/grpc/client_config.go
+++ b/plugins/client/grpc/client_config.go
@@ -73,6 +73,9 @@ func (c *Client) loadConfig() (*[]grpc.DialOption, error) {
return err
})
+ // using round-robin load balancer
+ options = append(options,
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`))
+
return &options, nil
}
diff --git a/plugins/client/grpc/resolvers/resolvers.go
b/plugins/client/grpc/resolvers/resolvers.go
new file mode 100644
index 0000000..9974d31
--- /dev/null
+++ b/plugins/client/grpc/resolvers/resolvers.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package resolvers
+
+import (
+ "fmt"
+
+ "google.golang.org/grpc/resolver"
+)
+
+// all customized resolvers
+var rs = []GrpcResolver{
+ &staticServerResolver{},
+}
+
+type ServerFinderConfig struct {
+ ServerAddr string `mapstructure:"server_addr"` // The gRPC server
address
+}
+
+type GrpcResolver interface {
+ resolver.Builder
+
+ // IsSupport client config
+ IsSupport(c *ServerFinderConfig) bool
+ // BuildTarget address by client config
+ BuildTarget(c *ServerFinderConfig) (string, error)
+}
+
+func RegisterAllGrpcResolvers() {
+ for _, r := range rs {
+ resolver.Register(r)
+ }
+}
+
+func BuildTarget(client *ServerFinderConfig) (string, error) {
+ for _, r := range rs {
+ if r.IsSupport(client) {
+ return r.BuildTarget(client)
+ }
+ }
+ return "", fmt.Errorf("could not build grpc target")
+}
diff --git a/plugins/client/grpc/resolvers/static_clients.go
b/plugins/client/grpc/resolvers/static_clients.go
new file mode 100644
index 0000000..16987fb
--- /dev/null
+++ b/plugins/client/grpc/resolvers/static_clients.go
@@ -0,0 +1,77 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package resolvers
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+
+ "google.golang.org/grpc/resolver"
+)
+
+var staticServerSchema = "static"
+
+type staticServerResolver struct {
+}
+
+func (s *staticServerResolver) IsSupport(c *ServerFinderConfig) bool {
+ return c.ServerAddr != ""
+}
+
+func (s *staticServerResolver) BuildTarget(c *ServerFinderConfig) (string,
error) {
+ // build target using uri endpoint
+ return fmt.Sprintf("%s:///%s", staticServerSchema, c.ServerAddr), nil
+}
+
+func (*staticServerResolver) Build(target resolver.Target, cc
resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
+ r := &staticResolver{
+ target: target,
+ cc: cc,
+ }
+ r.analyzeClients()
+ return r, nil
+}
+
+func (*staticServerResolver) Scheme() string {
+ return staticServerSchema
+}
+
+type staticResolver struct {
+ target resolver.Target
+ cc resolver.ClientConn
+}
+
+func (r *staticResolver) ResolveNow(o resolver.ResolveNowOptions) {
+ r.analyzeClients()
+}
+
+func (*staticResolver) Close() {
+}
+
+func (r *staticResolver) analyzeClients() {
+ addresses := strings.Split(r.target.Endpoint, ",")
+ addrs := make([]resolver.Address, len(addresses))
+ for i, s := range addresses {
+ addrs[i] = resolver.Address{Addr: s}
+ }
+ if err := r.cc.UpdateState(resolver.State{Addresses: addrs}); err !=
nil {
+ log.Logger.Warnf("error update static grpc client list: %v",
err)
+ }
+}
diff --git a/plugins/client/grpc/static_clients_test.go
b/plugins/client/grpc/static_clients_test.go
new file mode 100644
index 0000000..ba5a6d6
--- /dev/null
+++ b/plugins/client/grpc/static_clients_test.go
@@ -0,0 +1,154 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "context"
+ "crypto/rand"
+ "fmt"
+ "math/big"
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+
+ common "skywalking.apache.org/repo/goapi/collect/common/v3"
+ agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+ "google.golang.org/grpc"
+
+ client "github.com/apache/skywalking-satellite/plugins/client/api"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+ receiver "github.com/apache/skywalking-satellite/plugins/receiver/api"
+ server "github.com/apache/skywalking-satellite/plugins/server/api"
+ grpc_server "github.com/apache/skywalking-satellite/plugins/server/grpc"
+)
+
+type JVMReportService struct {
+ receiveCount int
+ agent.UnimplementedJVMMetricReportServiceServer
+}
+
+func (j *JVMReportService) Collect(_ context.Context, jvm
*agent.JVMMetricCollection) (*common.Commands, error) {
+ j.receiveCount++
+ return &common.Commands{}, nil
+}
+
+func TestStaticServer(t *testing.T) {
+ serverCount := 2
+ Init()
+
+ // init all servers
+ servers, ports := initServers(serverCount, t)
+ receivers := make([]*JVMReportService, serverCount)
+ for inx, s := range servers {
+ reportService := &JVMReportService{receiveCount: 0}
+ receivers[inx] = reportService
+
agent.RegisterJVMMetricReportServiceServer(s.GetServer().(*grpc.Server),
reportService)
+
+ if err := s.Start(); err != nil {
+ t.Errorf("start client error: %v", err)
+ }
+ }
+ defer func() {
+ for _, s := range servers {
+ s.Close()
+ }
+ }()
+
+ // init client
+ c := initClient(ports, t)
+
+ // wait all channel being connected (connect by async)
+ time.Sleep(time.Second * 1)
+
+ // send request
+ jvmClient :=
agent.NewJVMMetricReportServiceClient(c.GetConnectedClient().(*grpc.ClientConn))
+ for inx := 0; inx < serverCount; inx++ {
+ if _, err := jvmClient.Collect(context.Background(),
&agent.JVMMetricCollection{}); err != nil {
+ t.Errorf("send request error: %v", err)
+ }
+ }
+
+ // check all receiver must have received data
+ for inx, receiver := range receivers {
+ if receiver.receiveCount <= 0 {
+ t.Errorf("check result failed, client index: %d", inx)
+ }
+ }
+}
+
+func Init() {
+ log.Init(new(log.LoggerConfig))
+
plugin.RegisterPluginCategory(reflect.TypeOf((*server.Server)(nil)).Elem())
+
plugin.RegisterPluginCategory(reflect.TypeOf((*client.Client)(nil)).Elem())
+
plugin.RegisterPluginCategory(reflect.TypeOf((*receiver.Receiver)(nil)).Elem())
+ plugin.RegisterPlugin(new(grpc_server.Server))
+ plugin.RegisterPlugin(new(Client))
+}
+
+func initServers(serverCount int, t *testing.T) (servers []server.Server,
ports []int) {
+ for inx := 0; inx < serverCount; inx++ {
+ cfg := make(plugin.Config)
+ cfg[plugin.NameField] = grpc_server.Name
+ port := randomGrpcPort()
+ cfg["address"] = fmt.Sprintf(":%d", port)
+ q := server.GetServer(cfg)
+ if err := q.Prepare(); err != nil {
+ t.Fatalf("cannot perpare the grpc server: %v", err)
+ }
+
+ servers = append(servers, q)
+ ports = append(ports, port)
+ }
+
+ return servers, ports
+}
+
+func randomGrpcPort() int {
+ b := new(big.Int).SetInt64(int64(65535 - 1000))
+ i, err := rand.Int(rand.Reader, b)
+ if err != nil {
+ fmt.Printf("Can't generate random value: %v, %v", i, err)
+ return -1
+ }
+ return int(i.Int64() + 1000)
+}
+
+func initClient(ports []int, t *testing.T) client.Client {
+ cfg := make(plugin.Config)
+ cfg[plugin.NameField] = Name
+ serverList := ""
+ for inx := range ports {
+ if inx > 0 {
+ serverList += ","
+ }
+ serverList += fmt.Sprintf("%s%d", "0.0.0.0:", ports[inx])
+ }
+ cfg["server_addr"] = serverList
+ q := client.GetClient(cfg)
+ if err := q.Prepare(); err != nil {
+ t.Errorf("prepare client error: %v", err)
+ }
+ if err := q.Start(); err != nil {
+ t.Errorf("start client error: %v", err)
+ }
+ return q
+}