This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new b9d4f27  Using byte array to transmit the ALS message (#84)
b9d4f27 is described below

commit b9d4f275333fd48f7aabea2824cd7904f7b3ea78
Author: mrproliu <[email protected]>
AuthorDate: Mon Nov 22 08:07:43 2021 +0800

    Using byte array to transmit the ALS message (#84)
---
 CHANGES.md                                        |  1 +
 LICENSE                                           |  1 +
 go.mod                                            |  2 +-
 go.sum                                            |  4 +-
 internal/satellite/module/processor/processor.go  |  2 +-
 internal/satellite/telemetry/gauge.go             | 52 +++++++++++++---
 plugins/forwarder/grpc/envoyalsv2/forwarder.go    | 56 ++++++++++++++++--
 plugins/forwarder/grpc/envoyalsv3/forwarder.go    | 46 +++++----------
 plugins/queue/api/queue.go                        |  2 +-
 plugins/queue/memory/queue.go                     |  2 +-
 plugins/queue/mmap/queue.go                       |  4 +-
 plugins/queue/mmap/queue_test.go                  |  2 +-
 plugins/receiver/grpc/envoyalsv2/als_service.go   | 69 +++++++++++++++++++---
 plugins/receiver/grpc/envoyalsv2/receiver.go      |  1 +
 plugins/receiver/grpc/envoyalsv2/receiver_test.go |  6 +-
 plugins/receiver/grpc/envoyalsv3/als_service.go   | 72 ++++++++++++++++-------
 plugins/receiver/grpc/envoyalsv3/receiver_test.go |  6 +-
 plugins/server/grpc/server_codec.go               | 70 ++++++++++++++++++++++
 plugins/server/grpc/server_telementry.go          | 37 ++++++++++++
 19 files changed, 352 insertions(+), 83 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 61781b9..2374a83 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,6 +6,7 @@ Release Notes.
 ------------------
 #### Features
 * Support partition queue.
+* Using byte array to transmit the ALS streaming, reducing en/decoding cpu 
usage.
 
 #### Bug Fixes
 
diff --git a/LICENSE b/LICENSE
index 722153b..7063050 100644
--- a/LICENSE
+++ b/LICENSE
@@ -216,6 +216,7 @@ The following components are provided under the Apache 
License. See project link
 The text of each license is the standard Apache 2.0 license.
 
 prometheus/common v0.15.0: https://github.com/prometheus/common Apache-2.0
+grpc v1.40.0: https://github.com/grpc/grpc-go Apache-2.0
 
 ========================================================================
 BSD licenses
diff --git a/go.mod b/go.mod
index d46bd89..fce1276 100644
--- a/go.mod
+++ b/go.mod
@@ -23,5 +23,5 @@ require (
        google.golang.org/protobuf v1.27.1
        gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
        gotest.tools v2.2.0+incompatible
-       skywalking.apache.org/repo/goapi v0.0.0-20211014145040-b215a7f7b270
+       skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896
 )
diff --git a/go.sum b/go.sum
index 4a15fcc..ee7219f 100644
--- a/go.sum
+++ b/go.sum
@@ -1330,6 +1330,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod 
h1:bJZC9H9iH24zzfZ/41RGcq60oK
 sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20211014145040-b215a7f7b270 
h1:W6bwwu3ejWM/qupBF820tbxrk8s41daqJ1wSd4W3epE=
-skywalking.apache.org/repo/goapi v0.0.0-20211014145040-b215a7f7b270/go.mod 
h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
+skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896 
h1:Om3XZ0eizMIiQI56K9ukO2T4+xgv3g4tRyFQic1gkBk=
+skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896/go.mod 
h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
 sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod 
h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/internal/satellite/module/processor/processor.go 
b/internal/satellite/module/processor/processor.go
index 48cc4f7..af983dd 100644
--- a/internal/satellite/module/processor/processor.go
+++ b/internal/satellite/module/processor/processor.go
@@ -70,7 +70,7 @@ func (p *Processor) processPerPartition(ctx context.Context, 
partition int, wg *
                        // receive the input event from the output channel of 
the gatherer
                        case e := <-p.gatherer.OutputDataChannel(partition):
                                c := &event.OutputEventContext{
-                                       Offset:  e.Offset,
+                                       Offset:  &e.Offset,
                                        Context: make(map[string]*v1.SniffData),
                                }
                                c.Put(e.Event)
diff --git a/internal/satellite/telemetry/gauge.go 
b/internal/satellite/telemetry/gauge.go
index d5af811..d8fae63 100644
--- a/internal/satellite/telemetry/gauge.go
+++ b/internal/satellite/telemetry/gauge.go
@@ -25,10 +25,20 @@ type Gauge struct {
        gauge prometheus.GaugeFunc
 }
 
+type DynamicGauge struct {
+       Collector
+       name  string
+       gauge *prometheus.GaugeVec
+}
+
 func NewGauge(name, help string, getter func() float64, labels ...string) 
*Gauge {
        lock.Lock()
        defer lock.Unlock()
-       rebuildName, rebuildLabels := rebuildGaugeNameAndLabels(name, labels...)
+       rebuildName := rebuildGaugeName(name, labels...)
+       constLabels := make(map[string]string)
+       for inx := 0; inx < len(labels); inx += 2 {
+               constLabels[labels[inx]] = labels[inx+1]
+       }
        collector, ok := collectorContainer[rebuildName]
        if !ok {
                gauge := &Gauge{
@@ -36,7 +46,7 @@ func NewGauge(name, help string, getter func() float64, 
labels ...string) *Gauge
                        gauge: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
                                Name:        name,
                                Help:        help,
-                               ConstLabels: rebuildLabels,
+                               ConstLabels: constLabels,
                        }, getter),
                }
                Register(WithMeta(rebuildName, gauge.gauge))
@@ -46,13 +56,39 @@ func NewGauge(name, help string, getter func() float64, 
labels ...string) *Gauge
        return collector.(*Gauge)
 }
 
-func rebuildGaugeNameAndLabels(name string, labels ...string) (gaugeName 
string, labelsMap map[string]string) {
+func NewDynamicGauge(name, help string, labels ...string) *DynamicGauge {
+       lock.Lock()
+       defer lock.Unlock()
+       rebuildName := rebuildGaugeName(name, labels...)
+       collector, ok := collectorContainer[rebuildName]
+       if !ok {
+               gauge := &DynamicGauge{
+                       name: name,
+                       gauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{
+                               Name: name,
+                               Help: help,
+                       }, labels),
+               }
+               Register(WithMeta(rebuildName, gauge.gauge))
+               collectorContainer[rebuildName] = gauge
+               collector = gauge
+       }
+       return collector.(*DynamicGauge)
+}
+
+func (i *DynamicGauge) Inc(labels ...string) {
+       i.gauge.WithLabelValues(labels...).Inc()
+}
+
+func (i *DynamicGauge) Dec(labels ...string) {
+       i.gauge.WithLabelValues(labels...).Dec()
+}
+
+func rebuildGaugeName(name string, labels ...string) string {
        resultName := name
-       resultLabels := make(map[string]string)
-       for inx := 0; inx < len(labels); inx += 2 {
-               resultName += "_" + labels[inx] + "_" + labels[inx+1]
-               resultLabels[labels[inx]] = labels[inx+1]
+       for inx := 0; inx < len(labels); inx++ {
+               resultName += "_" + labels[inx]
        }
 
-       return resultName, resultLabels
+       return resultName
 }
diff --git a/plugins/forwarder/grpc/envoyalsv2/forwarder.go 
b/plugins/forwarder/grpc/envoyalsv2/forwarder.go
index a5636e5..082ebe1 100644
--- a/plugins/forwarder/grpc/envoyalsv2/forwarder.go
+++ b/plugins/forwarder/grpc/envoyalsv2/forwarder.go
@@ -31,6 +31,8 @@ import (
        "github.com/apache/skywalking-satellite/internal/pkg/config"
        "github.com/apache/skywalking-satellite/internal/pkg/log"
        "github.com/apache/skywalking-satellite/internal/satellite/event"
+       "github.com/apache/skywalking-satellite/internal/satellite/telemetry"
+       server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
 )
 
 const (
@@ -41,6 +43,26 @@ const (
 type Forwarder struct {
        config.CommonFields
        alsClient v2.AccessLogServiceClient
+
+       eventReadySendCount        *telemetry.Counter
+       eventSendFinishedCount     *telemetry.Counter
+       streamingReadySendCount    *telemetry.Counter
+       streamingSendFinishedCount *telemetry.Counter
+
+       forwardConnectTime *telemetry.Timer
+       forwardSendTime    *telemetry.Timer
+       forwardCloseTime   *telemetry.Timer
+}
+
+func (f *Forwarder) init() {
+       f.eventReadySendCount = telemetry.NewCounter("als_event_ready_send", 
"Total count of the ALS event ready send.")
+       f.eventSendFinishedCount = 
telemetry.NewCounter("als_event_send_finished", "Total count of the ALS event 
send finished.", "target")
+       f.streamingReadySendCount = 
telemetry.NewCounter("als_streaming_ready_send", "Total count of the ALS 
streaming ready send.")
+       f.streamingSendFinishedCount = 
telemetry.NewCounter("als_streaming_send_finished", "Total count of the ALS 
streaming send finished.", "target")
+
+       f.forwardConnectTime = telemetry.NewTimer("als_forward_connect_time", 
"Total time of the open ALS streaming.")
+       f.forwardSendTime = telemetry.NewTimer("als_forward_send_time", "Total 
time of the ALS send message.")
+       f.forwardCloseTime = telemetry.NewTimer("als_forward_close_time", 
"Total time of the ALS streaming close.")
 }
 
 func (f *Forwarder) Name() string {
@@ -66,29 +88,51 @@ func (f *Forwarder) Prepare(connection interface{}) error {
                        f.Name(), reflect.TypeOf(connection).String())
        }
        f.alsClient = v2.NewAccessLogServiceClient(client)
+       f.init()
        return nil
 }
 
 func (f *Forwarder) Forward(batch event.BatchEvents) error {
+       f.eventReadySendCount.Add(float64(len(batch)))
        for _, e := range batch {
-               data, ok := e.GetData().(*v1.SniffData_EnvoyALSV2List)
-               if !ok {
-                       continue
-               }
+               data, _ := e.GetData().(*v1.SniffData_EnvoyALSV2List)
+               
f.streamingReadySendCount.Add(float64(len(data.EnvoyALSV2List.Messages)))
+       }
+
+       for _, e := range batch {
+               // open stream
+               timeRecord := f.forwardConnectTime.Start()
                stream, err := 
f.alsClient.StreamAccessLogs(context.Background())
+               timeRecord.Stop()
                if err != nil {
                        log.Logger.Errorf("open grpc stream error %v", err)
                        return err
                }
-               for _, message := range data.EnvoyALSV2List.Messages {
-                       err := stream.Send(message)
+               peer := 
server_grpc.GetPeerHostFromStreamContext(stream.Context())
+               timeRecord = f.forwardSendTime.Start()
+
+               data := e.GetEnvoyALSV2List()
+               if data == nil {
+                       continue
+               }
+
+               // send message
+               for _, message := range data.Messages {
+                       err := 
stream.SendMsg(server_grpc.NewOriginalData(message))
                        if err != nil {
                                log.Logger.Errorf("%s send envoy ALS v2 data 
error: %v", f.Name(), err)
                                f.closeStream(stream)
                                return err
                        }
                }
+
+               f.eventSendFinishedCount.Inc(peer)
+               timeRecord.Stop()
+
+               // close stream
+               timeRecord = f.forwardCloseTime.Start()
                f.closeStream(stream)
+               timeRecord.Stop()
        }
        return nil
 }
diff --git a/plugins/forwarder/grpc/envoyalsv3/forwarder.go 
b/plugins/forwarder/grpc/envoyalsv3/forwarder.go
index 53245fb..1ffacc6 100644
--- a/plugins/forwarder/grpc/envoyalsv3/forwarder.go
+++ b/plugins/forwarder/grpc/envoyalsv3/forwarder.go
@@ -23,16 +23,16 @@ import (
        "io"
        "reflect"
 
-       "github.com/apache/skywalking-satellite/internal/satellite/telemetry"
+       "google.golang.org/grpc"
 
        v3 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v3"
        v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
 
-       "google.golang.org/grpc"
-
        "github.com/apache/skywalking-satellite/internal/pkg/config"
        "github.com/apache/skywalking-satellite/internal/pkg/log"
        "github.com/apache/skywalking-satellite/internal/satellite/event"
+       "github.com/apache/skywalking-satellite/internal/satellite/telemetry"
+       server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
 )
 
 const (
@@ -46,8 +46,6 @@ type Forwarder struct {
 
        eventReadySendCount        *telemetry.Counter
        eventSendFinishedCount     *telemetry.Counter
-       messageReadySendCount      *telemetry.Counter
-       messageSendFinishedCount   *telemetry.Counter
        streamingReadySendCount    *telemetry.Counter
        streamingSendFinishedCount *telemetry.Counter
 
@@ -58,11 +56,9 @@ type Forwarder struct {
 
 func (f *Forwarder) init() {
        f.eventReadySendCount = telemetry.NewCounter("als_event_ready_send", 
"Total count of the ALS event ready send.")
-       f.eventSendFinishedCount = 
telemetry.NewCounter("als_event_send_finished", "Total count of the ALS event 
send finished.")
-       f.messageReadySendCount = 
telemetry.NewCounter("als_message_ready_send", "Total count of the ALS message 
ready send.")
-       f.messageSendFinishedCount = 
telemetry.NewCounter("als_message_send_finished", "Total count of the ALS event 
sned finished.")
+       f.eventSendFinishedCount = 
telemetry.NewCounter("als_event_send_finished", "Total count of the ALS event 
send finished.", "target")
        f.streamingReadySendCount = 
telemetry.NewCounter("als_streaming_ready_send", "Total count of the ALS 
streaming ready send.")
-       f.streamingSendFinishedCount = 
telemetry.NewCounter("als_streaming_send_finished", "Total count of the ALS 
streaming send finished.")
+       f.streamingSendFinishedCount = 
telemetry.NewCounter("als_streaming_send_finished", "Total count of the ALS 
streaming send finished.", "target")
 
        f.forwardConnectTime = telemetry.NewTimer("als_forward_connect_time", 
"Total time of the open ALS streaming.")
        f.forwardSendTime = telemetry.NewTimer("als_forward_send_time", "Total 
time of the ALS send message.")
@@ -101,20 +97,9 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
        for _, e := range batch {
                data, _ := e.GetData().(*v1.SniffData_EnvoyALSV3List)
                
f.streamingReadySendCount.Add(float64(len(data.EnvoyALSV3List.Messages)))
-               for _, message := range data.EnvoyALSV3List.Messages {
-                       if message.GetHttpLogs() != nil {
-                               
f.messageReadySendCount.Add(float64(len(message.GetHttpLogs().LogEntry)))
-                       } else if message.GetTcpLogs() != nil {
-                               
f.messageReadySendCount.Add(float64(len(message.GetTcpLogs().LogEntry)))
-                       }
-               }
        }
 
        for _, e := range batch {
-               data, ok := e.GetData().(*v1.SniffData_EnvoyALSV3List)
-               if !ok {
-                       continue
-               }
                // open stream
                timeRecord := f.forwardConnectTime.Start()
                stream, err := 
f.alsClient.StreamAccessLogs(context.Background())
@@ -123,30 +108,31 @@ func (f *Forwarder) Forward(batch event.BatchEvents) 
error {
                        log.Logger.Errorf("open grpc stream error %v", err)
                        return err
                }
+               peer := 
server_grpc.GetPeerHostFromStreamContext(stream.Context())
+               timeRecord = f.forwardSendTime.Start()
+
+               data := e.GetEnvoyALSV3List()
+               if data == nil {
+                       continue
+               }
 
                // send message
-               timeRecord = f.forwardSendTime.Start()
-               for _, message := range data.EnvoyALSV3List.Messages {
-                       err := stream.Send(message)
+               for _, message := range data.Messages {
+                       err := 
stream.SendMsg(server_grpc.NewOriginalData(message))
                        if err != nil {
                                log.Logger.Errorf("%s send envoy ALS v3 data 
error: %v", f.Name(), err)
                                f.closeStream(stream)
                                return err
                        }
-                       if message.GetHttpLogs() != nil {
-                               
f.messageSendFinishedCount.Add(float64(len(message.GetHttpLogs().LogEntry)))
-                       } else if message.GetTcpLogs() != nil {
-                               
f.messageSendFinishedCount.Add(float64(len(message.GetTcpLogs().LogEntry)))
-                       }
                }
+
+               f.eventSendFinishedCount.Inc(peer)
                timeRecord.Stop()
 
                // close stream
                timeRecord = f.forwardCloseTime.Start()
                f.closeStream(stream)
                timeRecord.Stop()
-               f.eventSendFinishedCount.Inc()
-               
f.streamingSendFinishedCount.Add(float64(len(data.EnvoyALSV3List.Messages)))
        }
        return nil
 }
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
index a761423..f0f2def 100644
--- a/plugins/queue/api/queue.go
+++ b/plugins/queue/api/queue.go
@@ -58,7 +58,7 @@ type Queue interface {
 // SequenceEvent is a wrapper to pass the event and the offset.
 type SequenceEvent struct {
        Event  *v1.SniffData
-       Offset *event.Offset
+       Offset event.Offset
 }
 
 // GetQueue an initialized filter plugin.
diff --git a/plugins/queue/memory/queue.go b/plugins/queue/memory/queue.go
index 629df83..e99615b 100644
--- a/plugins/queue/memory/queue.go
+++ b/plugins/queue/memory/queue.go
@@ -33,7 +33,7 @@ const (
        ShowName = "Memory Queue"
 )
 
-var DefaultOffset = &event.Offset{
+var DefaultOffset = event.Offset{
        Position: "",
 }
 
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
index 09ce199..e210c71 100644
--- a/plugins/queue/mmap/queue.go
+++ b/plugins/queue/mmap/queue.go
@@ -283,8 +283,8 @@ func (q *Queue) IsFull() bool {
 }
 
 // encode the meta to the offset
-func (q *Queue) encodeOffset(id, offset int64) *event.Offset {
-       return &event.Offset{Position: strconv.FormatInt(id, 10) + "-" + 
strconv.FormatInt(offset, 10)}
+func (q *Queue) encodeOffset(id, offset int64) event.Offset {
+       return event.Offset{Position: strconv.FormatInt(id, 10) + "-" + 
strconv.FormatInt(offset, 10)}
 }
 
 // decode the offset to the meta of the mmap queue.
diff --git a/plugins/queue/mmap/queue_test.go b/plugins/queue/mmap/queue_test.go
index 59c33d2..ff1d2ee 100644
--- a/plugins/queue/mmap/queue_test.go
+++ b/plugins/queue/mmap/queue_test.go
@@ -222,7 +222,7 @@ func TestQueue_ReadHistory(t *testing.T) {
                        if err != nil {
                                t.Errorf("error in fetching data from queue: 
%v", err)
                        } else if cmp.Equal(events[index].String(), 
sequenceEvent.Event.String()) {
-                               q.Ack(sequenceEvent.Offset)
+                               q.Ack(&sequenceEvent.Offset)
                        } else {
                                t.Errorf("history data and fetching data is not 
equal\n,history:%+v\n. dequeue data:%+v\n", events[index], sequenceEvent.Event)
                        }
diff --git a/plugins/receiver/grpc/envoyalsv2/als_service.go 
b/plugins/receiver/grpc/envoyalsv2/als_service.go
index 9df2a81..3f923df 100644
--- a/plugins/receiver/grpc/envoyalsv2/als_service.go
+++ b/plugins/receiver/grpc/envoyalsv2/als_service.go
@@ -19,42 +19,84 @@ package envoyalsv2
 
 import (
        "context"
+       "fmt"
        "io"
        "time"
 
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+
        
"github.com/apache/skywalking-satellite/internal/satellite/module/buffer"
+       "github.com/apache/skywalking-satellite/internal/satellite/telemetry"
+       "github.com/apache/skywalking-satellite/plugins/server/grpc"
+
+       "google.golang.org/protobuf/proto"
 
        v2 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v2"
        v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
 )
 
-const eventName = "grpc-envoy-als-v2-event"
+const (
+       eventName  = "grpc-envoy-als-v2-event"
+       alsVersion = "v2"
+)
 
 type AlsService struct {
        receiveChannel chan *v1.SniffData
        limiterConfig  buffer.LimiterConfig
        v2.UnimplementedAccessLogServiceServer
+
+       streamingCount        *telemetry.Counter
+       streamingFailedCount  *telemetry.Counter
+       streamingToEventCount *telemetry.Counter
+       activeStreamingCount  *telemetry.DynamicGauge
+}
+
+func (m *AlsService) init() {
+       m.streamingCount = telemetry.NewCounter("als_streaming_receive_count",
+               "Total count of the receive stream in the ALS Receiver.", 
"version", "peer_host")
+       m.streamingFailedCount = 
telemetry.NewCounter("als_streaming_receive_failed_count",
+               "Total count of the failed receive message in the ALS 
Receiver.", "version", "peer_host")
+       m.streamingToEventCount = 
telemetry.NewCounter("als_streaming_to_event_count",
+               "Total count of the packaged the ALS streaming in the ALS 
Receiver.", "version", "peer_host")
+       m.activeStreamingCount = 
telemetry.NewDynamicGauge("als_streaming_active_count",
+               "Total active stream count in ALS Receiver", "version", 
"peer_host")
 }
 
 func (m *AlsService) StreamAccessLogs(stream 
v2.AccessLogService_StreamAccessLogsServer) error {
-       messages := make(chan *v2.StreamAccessLogsMessage, 
m.limiterConfig.LimitCount*2)
+       messages := make(chan []byte, m.limiterConfig.LimitCount*2)
        limiter := buffer.NewLimiter(m.limiterConfig, func() int {
                return len(messages)
        })
+       peer := grpc.GetPeerHostFromStreamContext(stream.Context())
+       m.activeStreamingCount.Inc(alsVersion, peer)
+       defer m.activeStreamingCount.Dec(alsVersion, peer)
 
        var identity *v2.StreamAccessLogsMessage_Identifier
 
        defer limiter.Stop()
        limiter.Start(context.Background(), func() {
+               if identity == nil {
+                       return
+               }
                count := len(messages)
                if count == 0 {
                        return
                }
-               logsMessages := make([]*v2.StreamAccessLogsMessage, 0)
+               logsMessages := make([][]byte, 0)
                for i := 0; i < count; i++ {
                        logsMessages = append(logsMessages, <-messages)
                }
-               logsMessages[0].Identifier = identity
+
+               // process first message identity
+               firstMessage := logsMessages[0]
+               firstAls := new(v2.StreamAccessLogsMessage)
+               if err := proto.Unmarshal(firstMessage, firstAls); err != nil {
+                       log.Logger.Warnf("could not unmarshal als message: %v", 
err)
+                       return
+               }
+               firstAls.Identifier = identity
+               marshal, _ := proto.Marshal(firstAls)
+               logsMessages[0] = marshal
 
                d := &v1.SniffData{
                        Name:      eventName,
@@ -68,20 +110,33 @@ func (m *AlsService) StreamAccessLogs(stream 
v2.AccessLogService_StreamAccessLog
                                },
                        },
                }
+               m.streamingToEventCount.Inc(alsVersion, peer)
                m.receiveChannel <- d
        })
 
        var err1 error
        for {
-               item, err := stream.Recv()
+               data := grpc.NewOriginalData(nil)
+               err := stream.RecvMsg(data)
                if err != nil {
                        err1 = err
+                       m.streamingFailedCount.Inc(alsVersion, peer)
                        break
                }
-               if item.Identifier != nil {
+               if identity == nil {
+                       item := new(v2.StreamAccessLogsMessage)
+                       err = proto.Unmarshal(data.Content, item)
+                       if err != nil {
+                               return fmt.Errorf("could not umarshal first 
message, %v", err)
+                       }
+                       if item.Identifier == nil {
+                               return fmt.Errorf("could not found identity in 
message")
+                       }
                        identity = item.Identifier
                }
-               messages <- item
+
+               m.streamingCount.Inc(alsVersion, peer)
+               messages <- data.Content
                limiter.Check()
        }
 
diff --git a/plugins/receiver/grpc/envoyalsv2/receiver.go 
b/plugins/receiver/grpc/envoyalsv2/receiver.go
index c3bb019..4326968 100644
--- a/plugins/receiver/grpc/envoyalsv2/receiver.go
+++ b/plugins/receiver/grpc/envoyalsv2/receiver.go
@@ -67,6 +67,7 @@ limit_count: 500
 func (r *Receiver) RegisterHandler(server interface{}) {
        r.CommonGRPCReceiverFields = *grpc.InitCommonGRPCReceiverFields(server)
        r.service = &AlsService{receiveChannel: r.OutputChannel, limiterConfig: 
r.LimitConfig}
+       r.service.init()
        v2.RegisterAccessLogServiceServer(r.Server, r.service)
 }
 
diff --git a/plugins/receiver/grpc/envoyalsv2/receiver_test.go 
b/plugins/receiver/grpc/envoyalsv2/receiver_test.go
index 89beb6a..c862fd4 100644
--- a/plugins/receiver/grpc/envoyalsv2/receiver_test.go
+++ b/plugins/receiver/grpc/envoyalsv2/receiver_test.go
@@ -22,6 +22,8 @@ import (
        "strconv"
        "testing"
 
+       "google.golang.org/protobuf/proto"
+
        v2 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v2"
        v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
 
@@ -50,7 +52,9 @@ func TestReceiver_RegisterHandler(t *testing.T) {
                }
                return data.String()
        }, func(data *v1.SniffData) string {
-               return data.GetEnvoyALSV2List().Messages[0].String()
+               m := new(v2.StreamAccessLogsMessage)
+               _ = proto.Unmarshal(data.GetEnvoyALSV2List().Messages[0], m)
+               return m.String()
        }, t)
 }
 
diff --git a/plugins/receiver/grpc/envoyalsv3/als_service.go 
b/plugins/receiver/grpc/envoyalsv3/als_service.go
index ef061c1..bfc34f8 100644
--- a/plugins/receiver/grpc/envoyalsv3/als_service.go
+++ b/plugins/receiver/grpc/envoyalsv3/als_service.go
@@ -19,59 +19,84 @@ package envoyalsv3
 
 import (
        "context"
+       "fmt"
        "io"
        "time"
 
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+
+       "google.golang.org/protobuf/proto"
+
        
"github.com/apache/skywalking-satellite/internal/satellite/module/buffer"
        "github.com/apache/skywalking-satellite/internal/satellite/telemetry"
+       "github.com/apache/skywalking-satellite/plugins/server/grpc"
 
        v3 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v3"
        v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
 )
 
-const eventName = "grpc-envoy-als-v3-event"
+const (
+       eventName  = "grpc-envoy-als-v3-event"
+       alsVersion = "v3"
+)
 
 type AlsService struct {
        receiveChannel chan *v1.SniffData
        limiterConfig  buffer.LimiterConfig
        v3.UnimplementedAccessLogServiceServer
 
-       messageCount          *telemetry.Counter
        streamingCount        *telemetry.Counter
        streamingFailedCount  *telemetry.Counter
        streamingToEventCount *telemetry.Counter
+       activeStreamingCount  *telemetry.DynamicGauge
 }
 
 func (m *AlsService) init() {
-       m.messageCount = telemetry.NewCounter("als_message_receive_count",
-               "Total count of the receive message in the ALS Receiver.")
        m.streamingCount = telemetry.NewCounter("als_streaming_receive_count",
-               "Total count of the receive stream in the ALS Receiver.")
+               "Total count of the receive stream in the ALS Receiver.", 
"version", "peer_host")
        m.streamingFailedCount = 
telemetry.NewCounter("als_streaming_receive_failed_count",
-               "Total count of the failed receive message in the ALS 
Receiver.")
+               "Total count of the failed receive message in the ALS 
Receiver.", "version", "peer_host")
        m.streamingToEventCount = 
telemetry.NewCounter("als_streaming_to_event_count",
-               "Total count of the packaged the ALS streaming in the ALS 
Receiver.")
+               "Total count of the packaged the ALS streaming in the ALS 
Receiver.", "version", "peer_host")
+       m.activeStreamingCount = 
telemetry.NewDynamicGauge("als_streaming_active_count",
+               "Total active stream count in ALS Receiver", "version", 
"peer_host")
 }
 
 func (m *AlsService) StreamAccessLogs(stream 
v3.AccessLogService_StreamAccessLogsServer) error {
-       messages := make(chan *v3.StreamAccessLogsMessage, 
m.limiterConfig.LimitCount*2)
+       messages := make(chan []byte, m.limiterConfig.LimitCount*2)
        limiter := buffer.NewLimiter(m.limiterConfig, func() int {
                return len(messages)
        })
+       peer := grpc.GetPeerHostFromStreamContext(stream.Context())
+       m.activeStreamingCount.Inc(alsVersion, peer)
+       defer m.activeStreamingCount.Dec(alsVersion, peer)
 
        var identity *v3.StreamAccessLogsMessage_Identifier
 
        defer limiter.Stop()
        limiter.Start(context.Background(), func() {
+               if identity == nil {
+                       return
+               }
                count := len(messages)
                if count == 0 {
                        return
                }
-               logsMessages := make([]*v3.StreamAccessLogsMessage, 0)
+               logsMessages := make([][]byte, 0)
                for i := 0; i < count; i++ {
                        logsMessages = append(logsMessages, <-messages)
                }
-               logsMessages[0].Identifier = identity
+
+               // process first message identity
+               firstMessage := logsMessages[0]
+               firstAls := new(v3.StreamAccessLogsMessage)
+               if err := proto.Unmarshal(firstMessage, firstAls); err != nil {
+                       log.Logger.Warnf("could not unmarshal als message: %v", 
err)
+                       return
+               }
+               firstAls.Identifier = identity
+               marshal, _ := proto.Marshal(firstAls)
+               logsMessages[0] = marshal
 
                d := &v1.SniffData{
                        Name:      eventName,
@@ -85,28 +110,33 @@ func (m *AlsService) StreamAccessLogs(stream 
v3.AccessLogService_StreamAccessLog
                                },
                        },
                }
-               m.streamingToEventCount.Inc()
+               m.streamingToEventCount.Inc(alsVersion, peer)
                m.receiveChannel <- d
        })
 
        var err1 error
        for {
-               item, err := stream.Recv()
+               data := grpc.NewOriginalData(nil)
+               err := stream.RecvMsg(data)
                if err != nil {
-                       m.streamingFailedCount.Inc()
+                       m.streamingFailedCount.Inc(alsVersion, peer)
                        err1 = err
                        break
                }
-               if item.Identifier != nil {
+               if identity == nil {
+                       item := new(v3.StreamAccessLogsMessage)
+                       err = proto.Unmarshal(data.Content, item)
+                       if err != nil {
+                               return fmt.Errorf("could not umarshal first 
message, %v", err)
+                       }
+                       if item.Identifier == nil {
+                               return fmt.Errorf("could not found identity in 
message")
+                       }
                        identity = item.Identifier
                }
-               m.streamingCount.Inc()
-               if item.GetHttpLogs() != nil {
-                       
m.messageCount.Add(float64(len(item.GetHttpLogs().LogEntry)))
-               } else if item.GetTcpLogs() != nil {
-                       
m.messageCount.Add(float64(len(item.GetTcpLogs().LogEntry)))
-               }
-               messages <- item
+
+               m.streamingCount.Inc(alsVersion, peer)
+               messages <- data.Content
                limiter.Check()
        }
 
diff --git a/plugins/receiver/grpc/envoyalsv3/receiver_test.go 
b/plugins/receiver/grpc/envoyalsv3/receiver_test.go
index 12c8ace..978dba8 100644
--- a/plugins/receiver/grpc/envoyalsv3/receiver_test.go
+++ b/plugins/receiver/grpc/envoyalsv3/receiver_test.go
@@ -22,6 +22,8 @@ import (
        "strconv"
        "testing"
 
+       "google.golang.org/protobuf/proto"
+
        "google.golang.org/grpc"
 
        v3 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v3"
@@ -50,7 +52,9 @@ func TestReceiver_RegisterHandler(t *testing.T) {
                }
                return data.String()
        }, func(data *v1.SniffData) string {
-               return data.GetEnvoyALSV3List().Messages[0].String()
+               m := new(v3.StreamAccessLogsMessage)
+               _ = proto.Unmarshal(data.GetEnvoyALSV3List().Messages[0], m)
+               return m.String()
        }, t)
 }
 
diff --git a/plugins/server/grpc/server_codec.go 
b/plugins/server/grpc/server_codec.go
new file mode 100644
index 0000000..3602496
--- /dev/null
+++ b/plugins/server/grpc/server_codec.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "fmt"
+
+       "google.golang.org/grpc/encoding"
+       "google.golang.org/protobuf/proto"
+)
+
+func init() {
+       encoding.RegisterCodec(codec{})
+}
+
+// OriginalData is keep binary Content
+type OriginalData struct {
+       Content []byte
+}
+
+func NewOriginalData(data []byte) *OriginalData {
+       return &OriginalData{Content: data}
+}
+
+// codec is overwritten the original "proto" codec, and support using 
OriginalData to skip data en/decoding.
+type codec struct{}
+
+func (codec) Marshal(v interface{}) ([]byte, error) {
+       vv, ok := v.(proto.Message)
+       if !ok {
+               original, ok := v.(*OriginalData)
+               if !ok {
+                       return nil, fmt.Errorf("failed to marshal, message is 
%T, want proto.Message or grpc.OriginalData", v)
+               }
+               return original.Content, nil
+       }
+       return proto.Marshal(vv)
+}
+
+func (codec) Unmarshal(data []byte, v interface{}) error {
+       vv, ok := v.(proto.Message)
+       if !ok {
+               original, ok := v.(*OriginalData)
+               if !ok {
+                       return fmt.Errorf("failed to unmarshal, message is %T, 
want proto.Message or grpc.OriginalData", v)
+               }
+               original.Content = data
+               return nil
+       }
+       return proto.Unmarshal(data, vv)
+}
+
+func (codec) Name() string {
+       return "proto"
+}
diff --git a/plugins/server/grpc/server_telementry.go 
b/plugins/server/grpc/server_telementry.go
new file mode 100644
index 0000000..9078c0e
--- /dev/null
+++ b/plugins/server/grpc/server_telementry.go
@@ -0,0 +1,37 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "strings"
+
+       "google.golang.org/grpc/peer"
+)
+
+func GetPeerHostFromStreamContext(ctx context.Context) string {
+       peerAddr, ok := peer.FromContext(ctx)
+       peerStr := ""
+       if ok {
+               peerStr = peerAddr.Addr.String()
+               if inx := strings.IndexByte(peerStr, ':'); inx > 0 {
+                       peerStr = peerStr[:strings.IndexByte(peerStr, ':')]
+               }
+       }
+       return peerStr
+}

Reply via email to