This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch grpc-receiver
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git

commit 11bd1e2172977a37b507cc9a4dedd617774b1e7f
Author: Evan <[email protected]>
AuthorDate: Fri Jan 1 23:42:27 2021 +0800

    gRPC server & log receiver
---
 dist/LICENSE                                |   1 +
 go.mod                                      |   1 +
 plugins/receiver/grpclog/receive_service.go |  59 ++++++++++++
 plugins/receiver/grpclog/receiver.go        |  71 ++++++++++++++
 plugins/receiver/grpclog/receiver_test.go   | 140 ++++++++++++++++++++++++++++
 plugins/receiver/log-grpc/README.md         |   1 -
 plugins/server/grpc/server.go               | 106 +++++++++++++++++++++
 7 files changed, 378 insertions(+), 1 deletion(-)

diff --git a/dist/LICENSE b/dist/LICENSE
index af6f460..e783359 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -215,6 +215,7 @@ Apache 2.0 licenses
 The following components are provided under the Apache License. See project 
link for details.
 The text of each license is the standard Apache 2.0 license.
     client_golang (prometheus) v1.9.0: 
https://github.com/prometheus/client_golang Apache 2.0
+    grpc (google) v1.34.0: https://google.golang.org/grpc Apache 2.0
 
 ========================================================================
 BSD licenses
diff --git a/go.mod b/go.mod
index 95c5280..48d7a96 100644
--- a/go.mod
+++ b/go.mod
@@ -12,6 +12,7 @@ require (
        github.com/sirupsen/logrus v1.7.0
        github.com/spf13/viper v1.7.1
        github.com/urfave/cli/v2 v2.3.0
+       google.golang.org/grpc v1.34.0
        google.golang.org/protobuf v1.25.0
        skywalking/network v1.0.0
 )
diff --git a/plugins/receiver/grpclog/receive_service.go 
b/plugins/receiver/grpclog/receive_service.go
new file mode 100644
index 0000000..de8ecb9
--- /dev/null
+++ b/plugins/receiver/grpclog/receive_service.go
@@ -0,0 +1,59 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpclog
+
+import (
+       "io"
+       "time"
+
+       common "skywalking/network/common/v3"
+       logging "skywalking/network/logging/v3"
+
+       
"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+const eventName = "grpc-log-event"
+
+type LogReportService struct {
+       receiveChannel chan *protocol.Event
+       logging.UnimplementedLogReportServiceServer
+}
+
+func (s *LogReportService) Collect(stream 
logging.LogReportService_CollectServer) error {
+       startTime := time.Now()
+       for {
+               logData, err := stream.Recv()
+               if err == io.EOF {
+                       return stream.SendAndClose(&common.Commands{})
+               }
+               if err != nil {
+                       return err
+               }
+               e := &protocol.Event{
+                       Name:      eventName,
+                       Timestamp: startTime.Unix(),
+                       Meta:      nil,
+                       Type:      protocol.EventType_Logging,
+                       Remote:    true,
+                       Data: &protocol.Event_Log{
+                               Log: logData,
+                       },
+               }
+               s.receiveChannel <- e
+       }
+}
diff --git a/plugins/receiver/grpclog/receiver.go 
b/plugins/receiver/grpclog/receiver.go
new file mode 100644
index 0000000..22f8c54
--- /dev/null
+++ b/plugins/receiver/grpclog/receiver.go
@@ -0,0 +1,71 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpclog
+
+import (
+       "fmt"
+       "reflect"
+
+       "google.golang.org/grpc"
+
+       logging "skywalking/network/logging/v3"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/config"
+       "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+       
"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+type Receiver struct {
+       config.CommonFields
+       MaxBufferSpaces int `mapstructure:"max_buffer_spaces"` // The max 
buffer events.
+
+       server        *grpc.Server
+       service       *LogReportService
+       outputChannel chan *protocol.Event // The output channel of grpc log
+
+}
+
+func (r *Receiver) Name() string {
+       return plugin.GetPluginName(r)
+}
+
+func (r *Receiver) Description() string {
+       return "this is a grpc log receiver"
+}
+
+func (r *Receiver) DefaultConfig() string {
+       return `
+# The max buffer events to process flow surge.
+max_buffer_spaces: 1
+`
+}
+
+func (r *Receiver) RegisterHandler(server interface{}) {
+       s, ok := server.(*grpc.Server)
+       if !ok {
+               panic(fmt.Errorf("registerHandler does not support %s", 
reflect.TypeOf(server).String()))
+       }
+       r.server = s
+       r.outputChannel = make(chan *protocol.Event, r.MaxBufferSpaces)
+       r.service = &LogReportService{receiveChannel: r.outputChannel}
+       logging.RegisterLogReportServiceServer(r.server, r.service)
+}
+
+func (r *Receiver) Channel() <-chan *protocol.Event {
+       return r.outputChannel
+}
diff --git a/plugins/receiver/grpclog/receiver_test.go 
b/plugins/receiver/grpclog/receiver_test.go
new file mode 100644
index 0000000..7b6c922
--- /dev/null
+++ b/plugins/receiver/grpclog/receiver_test.go
@@ -0,0 +1,140 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpclog
+
+import (
+       "context"
+       "reflect"
+       "strconv"
+       "testing"
+       "time"
+
+       "github.com/google/go-cmp/cmp"
+
+       "google.golang.org/grpc"
+
+       common "skywalking/network/common/v3"
+       logging "skywalking/network/logging/v3"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+       _ "github.com/apache/skywalking-satellite/internal/satellite/test"
+       receiver "github.com/apache/skywalking-satellite/plugins/receiver/api"
+       server "github.com/apache/skywalking-satellite/plugins/server/api"
+       grpcserver "github.com/apache/skywalking-satellite/plugins/server/grpc"
+       
"github.com/apache/skywalking-satellite/protocol/gen-codes/satellite/protocol"
+)
+
+func TestReceiver_RegisterHandler(t *testing.T) {
+       Init()
+       r := initReceiver(make(plugin.Config), t)
+       s := initServer(make(plugin.Config), t)
+       r.RegisterHandler(s.GetServer())
+       _ = s.Start()
+       time.Sleep(time.Second)
+       defer func() {
+               if err := s.Close(); err != nil {
+                       t.Fatalf("cannot close the sever: %v", err)
+               }
+       }()
+       client := initClient(t)
+       for i := 0; i < 10; i++ {
+               data := initData(i)
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               collect, err := client.Collect(ctx)
+               if err != nil {
+                       t.Fatalf("cannot open the stream send mode: %v", err)
+               }
+               if err := collect.Send(data); err != nil {
+                       t.Fatalf("cannot send the data to the server: %v", err)
+               }
+               if err := collect.CloseSend(); err != nil {
+                       t.Fatalf("cannot close the stream mode: %v", err)
+               }
+               newData := <-r.Channel()
+               if !cmp.Equal(newData.Data.(*protocol.Event_Log).Log.String(), 
data.String()) {
+                       t.Fatalf("the sent data is not equal to the received 
data\n, "+
+                               "want data %s\n, but got %s\n", data.String(), 
newData.String())
+               }
+               cancel()
+       }
+}
+
+func initData(sequence int) *logging.LogData {
+       seq := strconv.Itoa(sequence)
+       return &logging.LogData{
+               Timestamp:       time.Now().Unix(),
+               Service:         "demo-service" + seq,
+               ServiceInstance: "demo-instance" + seq,
+               Endpoint:        "demo-endpoint" + seq,
+               TraceContext: &logging.TraceContext{
+                       TraceSegmentId: "mock-segmentId" + seq,
+                       TraceId:        "mock-traceId" + seq,
+                       SpanId:         1,
+               },
+               Tags: []*common.KeyStringValuePair{
+                       {
+                               Key:   "mock-key" + seq,
+                               Value: "mock-value" + seq,
+                       },
+               },
+               Body: &logging.LogDataBody{
+                       Type: "mock-type" + seq,
+                       Content: &logging.LogDataBody_Text{
+                               Text: &logging.TextLog{
+                                       Text: "this is a mock text mock log" + 
seq,
+                               },
+                       },
+               },
+       }
+}
+
+func initClient(t *testing.T) logging.LogReportServiceClient {
+       conn, err := grpc.Dial("localhost:8000", grpc.WithInsecure(), 
grpc.WithBlock())
+       if err != nil {
+               t.Fatalf("cannot init the grpc client: %v", err)
+       }
+       return logging.NewLogReportServiceClient(conn)
+}
+
+func Init() {
+       
plugin.RegisterPluginCategory(reflect.TypeOf((*server.Server)(nil)).Elem())
+       
plugin.RegisterPluginCategory(reflect.TypeOf((*receiver.Receiver)(nil)).Elem())
+       plugin.RegisterPlugin(new(grpcserver.Server))
+       plugin.RegisterPlugin(new(Receiver))
+}
+
+func initServer(cfg plugin.Config, t *testing.T) server.Server {
+       cfg[plugin.NameField] = "grpc-server"
+       q := server.GetServer(cfg)
+       if q == nil {
+               t.Fatalf("cannot get a grpc server from the registry")
+       }
+       if err := q.Prepare(); err != nil {
+               t.Fatalf("cannot perpare the grpc server: %v", err)
+       }
+       return q
+}
+
+func initReceiver(cfg plugin.Config, t *testing.T) receiver.Receiver {
+       cfg[plugin.NameField] = "grpclog-receiver"
+       q := receiver.GetReceiver(cfg)
+       if q == nil {
+               t.Fatalf("cannot get grpclog-receiver from the registry")
+       }
+       return q
+}
diff --git a/plugins/receiver/log-grpc/README.md 
b/plugins/receiver/log-grpc/README.md
deleted file mode 100644
index 3f03ea1..0000000
--- a/plugins/receiver/log-grpc/README.md
+++ /dev/null
@@ -1 +0,0 @@
-# Plugin description
\ No newline at end of file
diff --git a/plugins/server/grpc/server.go b/plugins/server/grpc/server.go
new file mode 100644
index 0000000..c953b27
--- /dev/null
+++ b/plugins/server/grpc/server.go
@@ -0,0 +1,106 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "net"
+
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/config"
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+       "github.com/apache/skywalking-satellite/internal/pkg/plugin"
+)
+
+type Server struct {
+       config.CommonFields
+       Address              string `mapstructure:"address"`                // 
The address of grpc server.
+       Network              string `mapstructure:"network"`                // 
The network of grpc.
+       MaxRecvMsgSize       int    `mapstructure:"max_recv_msg_size"`      // 
The max size of the received log.
+       MaxConcurrentStreams uint32 `mapstructure:"max_concurrent_streams"` // 
The max concurrent stream channels.
+       TLSCertFile          string `mapstructure:"tls_cert_file"`          // 
The TLS cert file path.
+       TLSKeytFile          string `mapstructure:"tls_key_file"`           // 
The TLS key file path.
+       // components
+       server   *grpc.Server
+       listener net.Listener
+}
+
+func (s *Server) Name() string {
+       return plugin.GetPluginName(s)
+}
+
+func (s *Server) Description() string {
+       return "this is a grpc server"
+}
+
+func (s *Server) DefaultConfig() string {
+       return `
+# The address of grpc server. Default value is :8000
+address: :8000
+# The network of grpc. Default value is :tcp
+network: tcp
+# The max size of receiving log. Default value is 2M. The unit is Byte.
+max_recv_msg_size: 2097152
+# The max concurrent stream channels.
+max_concurrent_streams: 32
+# The TLS cert file path.
+tls_cert_file: 
+# The TLS key file path.
+tls_key_file: 
+`
+}
+
+func (s *Server) Prepare() error {
+       var options []grpc.ServerOption
+       if s.TLSCertFile != "" && s.TLSKeytFile != "" {
+               if c, err := credentials.NewClientTLSFromFile(s.TLSCertFile, 
s.TLSKeytFile); err == nil {
+                       options = append(options, grpc.Creds(c))
+               } else {
+                       log.Logger.Errorf("error in checking TLS files: %v", 
err)
+                       return err
+               }
+       }
+       options = append(options, grpc.MaxRecvMsgSize(s.MaxRecvMsgSize), 
grpc.MaxConcurrentStreams(s.MaxConcurrentStreams))
+       s.server = grpc.NewServer(options...)
+       listener, err := net.Listen(s.Network, s.Address)
+       if err != nil {
+               log.Logger.Errorf("grpc server cannot be created: %v", err)
+               return err
+       }
+       s.listener = listener
+       return nil
+}
+
+func (s *Server) Start() error {
+       go func() {
+               if err := s.server.Serve(s.listener); err != nil {
+                       log.Logger.Fatalf("failed to open a grpc serve: %v", 
err)
+               }
+       }()
+       return nil
+}
+
+func (s *Server) Close() error {
+       s.server.GracefulStop()
+       return nil
+}
+
+func (s *Server) GetServer() interface{} {
+       return s.server
+}

Reply via email to