This is an automated email from the ASF dual-hosted git repository. liujiapeng pushed a commit to branch grpc-receiver in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
commit 11bd1e2172977a37b507cc9a4dedd617774b1e7f Author: Evan <[email protected]> AuthorDate: Fri Jan 1 23:42:27 2021 +0800 gRPC server & log receiver --- dist/LICENSE | 1 + go.mod | 1 + plugins/receiver/grpclog/receive_service.go | 59 ++++++++++++ plugins/receiver/grpclog/receiver.go | 71 ++++++++++++++ plugins/receiver/grpclog/receiver_test.go | 140 ++++++++++++++++++++++++++++ plugins/receiver/log-grpc/README.md | 1 - plugins/server/grpc/server.go | 106 +++++++++++++++++++++ 7 files changed, 378 insertions(+), 1 deletion(-) diff --git a/dist/LICENSE b/dist/LICENSE index af6f460..e783359 100644 --- a/dist/LICENSE +++ b/dist/LICENSE @@ -215,6 +215,7 @@ Apache 2.0 licenses The following components are provided under the Apache License. See project link for details. The text of each license is the standard Apache 2.0 license. client_golang (prometheus) v1.9.0: https://github.com/prometheus/client_golang Apache 2.0 + grpc (google) v1.34.0: https://google.golang.org/grpc Apache 2.0 ======================================================================== BSD licenses diff --git a/go.mod b/go.mod index 95c5280..48d7a96 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/sirupsen/logrus v1.7.0 github.com/spf13/viper v1.7.1 github.com/urfave/cli/v2 v2.3.0 + google.golang.org/grpc v1.34.0 google.golang.org/protobuf v1.25.0 skywalking/network v1.0.0 ) diff --git a/plugins/receiver/grpclog/receive_service.go b/plugins/receiver/grpclog/receive_service.go new file mode 100644 index 0000000..de8ecb9 --- /dev/null +++ b/plugins/receiver/grpclog/receive_service.go @@ -0,0 +1,59 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package grpclog + +import ( + "io" + "time" + + common "skywalking/network/common/v3" + logging "skywalking/network/logging/v3" + + "github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol" +) + +const eventName = "grpc-log-event" + +type LogReportService struct { + receiveChannel chan *protocol.Event + logging.UnimplementedLogReportServiceServer +} + +func (s *LogReportService) Collect(stream logging.LogReportService_CollectServer) error { + startTime := time.Now() + for { + logData, err := stream.Recv() + if err == io.EOF { + return stream.SendAndClose(&common.Commands{}) + } + if err != nil { + return err + } + e := &protocol.Event{ + Name: eventName, + Timestamp: startTime.Unix(), + Meta: nil, + Type: protocol.EventType_Logging, + Remote: true, + Data: &protocol.Event_Log{ + Log: logData, + }, + } + s.receiveChannel <- e + } +} diff --git a/plugins/receiver/grpclog/receiver.go b/plugins/receiver/grpclog/receiver.go new file mode 100644 index 0000000..22f8c54 --- /dev/null +++ b/plugins/receiver/grpclog/receiver.go @@ -0,0 +1,71 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package grpclog + +import ( + "fmt" + "reflect" + + "google.golang.org/grpc" + + logging "skywalking/network/logging/v3" + + "github.com/apache/skywalking-satellite/internal/pkg/config" + "github.com/apache/skywalking-satellite/internal/pkg/plugin" + "github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol" +) + +type Receiver struct { + config.CommonFields + MaxBufferSpaces int `mapstructure:"max_buffer_spaces"` // The max buffer events. + + server *grpc.Server + service *LogReportService + outputChannel chan *protocol.Event // The output channel of grpc log + +} + +func (r *Receiver) Name() string { + return plugin.GetPluginName(r) +} + +func (r *Receiver) Description() string { + return "this is a grpc log receiver" +} + +func (r *Receiver) DefaultConfig() string { + return ` +# The max buffer events to process flow surge. +max_buffer_spaces: 1 +` +} + +func (r *Receiver) RegisterHandler(server interface{}) { + s, ok := server.(*grpc.Server) + if !ok { + panic(fmt.Errorf("registerHandler does not support %s", reflect.TypeOf(server).String())) + } + r.server = s + r.outputChannel = make(chan *protocol.Event, r.MaxBufferSpaces) + r.service = &LogReportService{receiveChannel: r.outputChannel} + logging.RegisterLogReportServiceServer(r.server, r.service) +} + +func (r *Receiver) Channel() <-chan *protocol.Event { + return r.outputChannel +} diff --git a/plugins/receiver/grpclog/receiver_test.go b/plugins/receiver/grpclog/receiver_test.go new file mode 100644 index 0000000..7b6c922 --- /dev/null +++ b/plugins/receiver/grpclog/receiver_test.go @@ -0,0 +1,140 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package grpclog + +import ( + "context" + "reflect" + "strconv" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + "google.golang.org/grpc" + + common "skywalking/network/common/v3" + logging "skywalking/network/logging/v3" + + "github.com/apache/skywalking-satellite/internal/pkg/plugin" + _ "github.com/apache/skywalking-satellite/internal/satellite/test" + receiver "github.com/apache/skywalking-satellite/plugins/receiver/api" + server "github.com/apache/skywalking-satellite/plugins/server/api" + grpcserver "github.com/apache/skywalking-satellite/plugins/server/grpc" + "github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol" +) + +func TestReceiver_RegisterHandler(t *testing.T) { + Init() + r := initReceiver(make(plugin.Config), t) + s := initServer(make(plugin.Config), t) + r.RegisterHandler(s.GetServer()) + _ = s.Start() + time.Sleep(time.Second) + defer func() { + if err := s.Close(); err != nil { + t.Fatalf("cannot close the sever: %v", err) + } + }() + client := initClient(t) + for i := 0; i < 10; i++ { + data := initData(i) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + collect, err := client.Collect(ctx) + if err != nil { + t.Fatalf("cannot open the stream send mode: %v", err) + } + if err := collect.Send(data); err != nil { + t.Fatalf("cannot send the data to the server: %v", err) + } + if err := collect.CloseSend(); err != nil { + t.Fatalf("cannot close the stream mode: %v", err) + } + newData := <-r.Channel() + if !cmp.Equal(newData.Data.(*protocol.Event_Log).Log.String(), data.String()) { + t.Fatalf("the sent data is not equal to the received data\n, "+ + "want data %s\n, but got %s\n", data.String(), newData.String()) + } + cancel() + } +} + +func initData(sequence int) *logging.LogData { + seq := strconv.Itoa(sequence) + return &logging.LogData{ + Timestamp: time.Now().Unix(), + Service: "demo-service" + seq, + ServiceInstance: "demo-instance" + seq, + Endpoint: "demo-endpoint" + seq, + TraceContext: &logging.TraceContext{ + TraceSegmentId: "mock-segmentId" + seq, + TraceId: "mock-traceId" + seq, + SpanId: 1, + }, + Tags: []*common.KeyStringValuePair{ + { + Key: "mock-key" + seq, + Value: "mock-value" + seq, + }, + }, + Body: &logging.LogDataBody{ + Type: "mock-type" + seq, + Content: &logging.LogDataBody_Text{ + Text: &logging.TextLog{ + Text: "this is a mock text mock log" + seq, + }, + }, + }, + } +} + +func initClient(t *testing.T) logging.LogReportServiceClient { + conn, err := grpc.Dial("localhost:8000", grpc.WithInsecure(), grpc.WithBlock()) + if err != nil { + t.Fatalf("cannot init the grpc client: %v", err) + } + return logging.NewLogReportServiceClient(conn) +} + +func Init() { + plugin.RegisterPluginCategory(reflect.TypeOf((*server.Server)(nil)).Elem()) + plugin.RegisterPluginCategory(reflect.TypeOf((*receiver.Receiver)(nil)).Elem()) + plugin.RegisterPlugin(new(grpcserver.Server)) + plugin.RegisterPlugin(new(Receiver)) +} + +func initServer(cfg plugin.Config, t *testing.T) server.Server { + cfg[plugin.NameField] = "grpc-server" + q := server.GetServer(cfg) + if q == nil { + t.Fatalf("cannot get a grpc server from the registry") + } + if err := q.Prepare(); err != nil { + t.Fatalf("cannot perpare the grpc server: %v", err) + } + return q +} + +func initReceiver(cfg plugin.Config, t *testing.T) receiver.Receiver { + cfg[plugin.NameField] = "grpclog-receiver" + q := receiver.GetReceiver(cfg) + if q == nil { + t.Fatalf("cannot get grpclog-receiver from the registry") + } + return q +} diff --git a/plugins/receiver/log-grpc/README.md b/plugins/receiver/log-grpc/README.md deleted file mode 100644 index 3f03ea1..0000000 --- a/plugins/receiver/log-grpc/README.md +++ /dev/null @@ -1 +0,0 @@ -# Plugin description \ No newline at end of file diff --git a/plugins/server/grpc/server.go b/plugins/server/grpc/server.go new file mode 100644 index 0000000..c953b27 --- /dev/null +++ b/plugins/server/grpc/server.go @@ -0,0 +1,106 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package grpc + +import ( + "net" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + "github.com/apache/skywalking-satellite/internal/pkg/config" + "github.com/apache/skywalking-satellite/internal/pkg/log" + "github.com/apache/skywalking-satellite/internal/pkg/plugin" +) + +type Server struct { + config.CommonFields + Address string `mapstructure:"address"` // The address of grpc server. + Network string `mapstructure:"network"` // The network of grpc. + MaxRecvMsgSize int `mapstructure:"max_recv_msg_size"` // The max size of the received log. + MaxConcurrentStreams uint32 `mapstructure:"max_concurrent_streams"` // The max concurrent stream channels. + TLSCertFile string `mapstructure:"tls_cert_file"` // The TLS cert file path. + TLSKeytFile string `mapstructure:"tls_key_file"` // The TLS key file path. + // components + server *grpc.Server + listener net.Listener +} + +func (s *Server) Name() string { + return plugin.GetPluginName(s) +} + +func (s *Server) Description() string { + return "this is a grpc server" +} + +func (s *Server) DefaultConfig() string { + return ` +# The address of grpc server. Default value is :8000 +address: :8000 +# The network of grpc. Default value is :tcp +network: tcp +# The max size of receiving log. Default value is 2M. The unit is Byte. +max_recv_msg_size: 2097152 +# The max concurrent stream channels. +max_concurrent_streams: 32 +# The TLS cert file path. +tls_cert_file: +# The TLS key file path. +tls_key_file: +` +} + +func (s *Server) Prepare() error { + var options []grpc.ServerOption + if s.TLSCertFile != "" && s.TLSKeytFile != "" { + if c, err := credentials.NewClientTLSFromFile(s.TLSCertFile, s.TLSKeytFile); err == nil { + options = append(options, grpc.Creds(c)) + } else { + log.Logger.Errorf("error in checking TLS files: %v", err) + return err + } + } + options = append(options, grpc.MaxRecvMsgSize(s.MaxRecvMsgSize), grpc.MaxConcurrentStreams(s.MaxConcurrentStreams)) + s.server = grpc.NewServer(options...) + listener, err := net.Listen(s.Network, s.Address) + if err != nil { + log.Logger.Errorf("grpc server cannot be created: %v", err) + return err + } + s.listener = listener + return nil +} + +func (s *Server) Start() error { + go func() { + if err := s.server.Serve(s.listener); err != nil { + log.Logger.Fatalf("failed to open a grpc serve: %v", err) + } + }() + return nil +} + +func (s *Server) Close() error { + s.server.GracefulStop() + return nil +} + +func (s *Server) GetServer() interface{} { + return s.server +}
