This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new c89b750  Support the native meter batch protocol (#89)
c89b750 is described below

commit c89b7500453559c1002c494304ee53cbfe2d8eea
Author: mrproliu <[email protected]>
AuthorDate: Tue Nov 30 15:51:48 2021 +0800

    Support the native meter batch protocol (#89)
---
 CHANGES.md                                         |  2 +
 go.mod                                             |  2 +-
 go.sum                                             |  4 +-
 plugins/forwarder/grpc/nativemeter/forwarder.go    | 66 ++++++++++++++++------
 plugins/receiver/grpc/nativemeter/meter_service.go | 24 ++++++++
 5 files changed, 78 insertions(+), 20 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index e5a8b4a..53f6787 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,8 @@ Release Notes.
 #### Features
 * Support partition queue.
 * Using byte array to transmit the ALS streaming, Native tracing segment and 
log, reducing en/decoding cpu usage.
+* Support using the new ALS protocol to transmit the Envoy accesslog.
+* Support transmit the Native Meter Batch protocol.
 
 #### Bug Fixes
 
diff --git a/go.mod b/go.mod
index 92baf98..7e48650 100644
--- a/go.mod
+++ b/go.mod
@@ -23,5 +23,5 @@ require (
        google.golang.org/protobuf v1.27.1
        gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
        gotest.tools v2.2.0+incompatible
-       skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68
+       skywalking.apache.org/repo/goapi v0.0.0-20211130045256-aee6db90e633
 )
diff --git a/go.sum b/go.sum
index 28f5755..fdb77ac 100644
--- a/go.sum
+++ b/go.sum
@@ -1330,6 +1330,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod 
h1:bJZC9H9iH24zzfZ/41RGcq60oK
 sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68 
h1:DE5enrtUAh/PViRIJCYFLMBjmvPLJVmXEyHJZjBtK90=
-skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68/go.mod 
h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
+skywalking.apache.org/repo/goapi v0.0.0-20211130045256-aee6db90e633 
h1:sneWCKi5BckD4crMDSU4IjFymZjYGsRdHE1O/B9wugA=
+skywalking.apache.org/repo/goapi v0.0.0-20211130045256-aee6db90e633/go.mod 
h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
 sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod 
h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/plugins/forwarder/grpc/nativemeter/forwarder.go 
b/plugins/forwarder/grpc/nativemeter/forwarder.go
index f3c172d..5f1f04b 100644
--- a/plugins/forwarder/grpc/nativemeter/forwarder.go
+++ b/plugins/forwarder/grpc/nativemeter/forwarder.go
@@ -31,6 +31,7 @@ import (
        "github.com/apache/skywalking-satellite/internal/pkg/config"
        "github.com/apache/skywalking-satellite/internal/pkg/log"
        "github.com/apache/skywalking-satellite/internal/satellite/event"
+       server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
 )
 
 const (
@@ -70,7 +71,7 @@ func (f *Forwarder) Prepare(connection interface{}) error {
 }
 
 func (f *Forwarder) Forward(batch event.BatchEvents) error {
-       streamMap := make(map[string]v3.MeterReportService_CollectClient)
+       streamMap := make(map[string]grpc.ClientStream)
        defer func() {
                for _, stream := range streamMap {
                        err := closeStream(stream)
@@ -80,34 +81,65 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
                }
        }()
        for _, e := range batch {
-               data, ok := e.GetData().(*v1.SniffData_Meter)
-               if !ok {
-                       continue
+               if data, ok := e.GetData().(*v1.SniffData_Meter); ok {
+                       if err := f.handleMeter(data, streamMap); err != nil {
+                               return err
+                       }
                }
-               streamName := fmt.Sprintf("%s_%s", data.Meter.Service, 
data.Meter.ServiceInstance)
-               stream := streamMap[streamName]
-               if stream == nil {
-                       curStream, err := 
f.meterClient.Collect(context.Background())
-                       if err != nil {
-                               log.Logger.Errorf("open grpc stream error %v", 
err)
+               if data, ok := e.GetData().(*v1.SniffData_MeterCollection); ok {
+                       if err := f.handleMeterCollection(data, streamMap); err 
!= nil {
                                return err
                        }
-                       streamMap[streamName] = curStream
-                       stream = curStream
                }
+       }
+       return nil
+}
 
-               err := stream.Send(data.Meter)
+func (f *Forwarder) handleMeterCollection(data *v1.SniffData_MeterCollection, 
streamMap map[string]grpc.ClientStream) error {
+       streamName := "batch-stream"
+       stream := streamMap[streamName]
+       if stream == nil {
+               curStream, err := 
f.meterClient.CollectBatch(context.Background())
                if err != nil {
-                       log.Logger.Errorf("%s send meter data error: %v", 
f.Name(), err)
+                       log.Logger.Errorf("open grpc stream error %v", err)
                        return err
                }
+               streamMap[streamName] = curStream
+               stream = curStream
+       }
+
+       if err := stream.SendMsg(data.MeterCollection); err != nil {
+               log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
+               return err
        }
        return nil
 }
 
-func closeStream(stream v3.MeterReportService_CollectClient) error {
-       _, err := stream.CloseAndRecv()
-       if err != nil && err != io.EOF {
+func (f *Forwarder) handleMeter(data *v1.SniffData_Meter, streamMap 
map[string]grpc.ClientStream) error {
+       streamName := fmt.Sprintf("%s_%s", data.Meter.Service, 
data.Meter.ServiceInstance)
+       stream := streamMap[streamName]
+       if stream == nil {
+               curStream, err := f.meterClient.Collect(context.Background())
+               if err != nil {
+                       log.Logger.Errorf("open grpc stream error %v", err)
+                       return err
+               }
+               streamMap[streamName] = curStream
+               stream = curStream
+       }
+
+       if err := stream.SendMsg(data.Meter); err != nil {
+               log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
+               return err
+       }
+       return nil
+}
+
+func closeStream(stream grpc.ClientStream) error {
+       if err := stream.CloseSend(); err != nil && err != io.EOF {
+               return err
+       }
+       if err := stream.RecvMsg(server_grpc.NewOriginalData(nil)); err != nil {
                return err
        }
        return nil
diff --git a/plugins/receiver/grpc/nativemeter/meter_service.go 
b/plugins/receiver/grpc/nativemeter/meter_service.go
index 6db8b91..fd3c2f5 100644
--- a/plugins/receiver/grpc/nativemeter/meter_service.go
+++ b/plugins/receiver/grpc/nativemeter/meter_service.go
@@ -66,3 +66,27 @@ func (m *MeterService) Collect(stream 
meter.MeterReportService_CollectServer) er
                m.receiveChannel <- d
        }
 }
+
+func (m *MeterService) CollectBatch(batch 
meter.MeterReportService_CollectBatchServer) error {
+       for {
+               item, err := batch.Recv()
+               if err == io.EOF {
+                       return nil
+               }
+               if err != nil {
+                       return err
+               }
+
+               d := &v1.SniffData{
+                       Name:      eventName,
+                       Timestamp: time.Now().UnixNano() / 1e6,
+                       Meta:      nil,
+                       Type:      v1.SniffType_MeterType,
+                       Remote:    true,
+                       Data: &v1.SniffData_MeterCollection{
+                               MeterCollection: item,
+                       },
+               }
+               m.receiveChannel <- d
+       }
+}

Reply via email to