This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new b9d4f27 Using byte array to transmit the ALS message (#84)
b9d4f27 is described below
commit b9d4f275333fd48f7aabea2824cd7904f7b3ea78
Author: mrproliu <[email protected]>
AuthorDate: Mon Nov 22 08:07:43 2021 +0800
Using byte array to transmit the ALS message (#84)
---
CHANGES.md | 1 +
LICENSE | 1 +
go.mod | 2 +-
go.sum | 4 +-
internal/satellite/module/processor/processor.go | 2 +-
internal/satellite/telemetry/gauge.go | 52 +++++++++++++---
plugins/forwarder/grpc/envoyalsv2/forwarder.go | 56 ++++++++++++++++--
plugins/forwarder/grpc/envoyalsv3/forwarder.go | 46 +++++----------
plugins/queue/api/queue.go | 2 +-
plugins/queue/memory/queue.go | 2 +-
plugins/queue/mmap/queue.go | 4 +-
plugins/queue/mmap/queue_test.go | 2 +-
plugins/receiver/grpc/envoyalsv2/als_service.go | 69 +++++++++++++++++++---
plugins/receiver/grpc/envoyalsv2/receiver.go | 1 +
plugins/receiver/grpc/envoyalsv2/receiver_test.go | 6 +-
plugins/receiver/grpc/envoyalsv3/als_service.go | 72 ++++++++++++++++-------
plugins/receiver/grpc/envoyalsv3/receiver_test.go | 6 +-
plugins/server/grpc/server_codec.go | 70 ++++++++++++++++++++++
plugins/server/grpc/server_telementry.go | 37 ++++++++++++
19 files changed, 352 insertions(+), 83 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 61781b9..2374a83 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,6 +6,7 @@ Release Notes.
------------------
#### Features
* Support partition queue.
+* Using byte array to transmit the ALS streaming, reducing en/decoding cpu
usage.
#### Bug Fixes
diff --git a/LICENSE b/LICENSE
index 722153b..7063050 100644
--- a/LICENSE
+++ b/LICENSE
@@ -216,6 +216,7 @@ The following components are provided under the Apache
License. See project link
The text of each license is the standard Apache 2.0 license.
prometheus/common v0.15.0: https://github.com/prometheus/common Apache-2.0
+grpc v1.40.0: https://github.com/grpc/grpc-go Apache-2.0
========================================================================
BSD licenses
diff --git a/go.mod b/go.mod
index d46bd89..fce1276 100644
--- a/go.mod
+++ b/go.mod
@@ -23,5 +23,5 @@ require (
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
gotest.tools v2.2.0+incompatible
- skywalking.apache.org/repo/goapi v0.0.0-20211014145040-b215a7f7b270
+ skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896
)
diff --git a/go.sum b/go.sum
index 4a15fcc..ee7219f 100644
--- a/go.sum
+++ b/go.sum
@@ -1330,6 +1330,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod
h1:bJZC9H9iH24zzfZ/41RGcq60oK
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20211014145040-b215a7f7b270
h1:W6bwwu3ejWM/qupBF820tbxrk8s41daqJ1wSd4W3epE=
-skywalking.apache.org/repo/goapi v0.0.0-20211014145040-b215a7f7b270/go.mod
h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
+skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896
h1:Om3XZ0eizMIiQI56K9ukO2T4+xgv3g4tRyFQic1gkBk=
+skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896/go.mod
h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod
h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/internal/satellite/module/processor/processor.go
b/internal/satellite/module/processor/processor.go
index 48cc4f7..af983dd 100644
--- a/internal/satellite/module/processor/processor.go
+++ b/internal/satellite/module/processor/processor.go
@@ -70,7 +70,7 @@ func (p *Processor) processPerPartition(ctx context.Context,
partition int, wg *
// receive the input event from the output channel of
the gatherer
case e := <-p.gatherer.OutputDataChannel(partition):
c := &event.OutputEventContext{
- Offset: e.Offset,
+ Offset: &e.Offset,
Context: make(map[string]*v1.SniffData),
}
c.Put(e.Event)
diff --git a/internal/satellite/telemetry/gauge.go
b/internal/satellite/telemetry/gauge.go
index d5af811..d8fae63 100644
--- a/internal/satellite/telemetry/gauge.go
+++ b/internal/satellite/telemetry/gauge.go
@@ -25,10 +25,20 @@ type Gauge struct {
gauge prometheus.GaugeFunc
}
+type DynamicGauge struct {
+ Collector
+ name string
+ gauge *prometheus.GaugeVec
+}
+
func NewGauge(name, help string, getter func() float64, labels ...string)
*Gauge {
lock.Lock()
defer lock.Unlock()
- rebuildName, rebuildLabels := rebuildGaugeNameAndLabels(name, labels...)
+ rebuildName := rebuildGaugeName(name, labels...)
+ constLabels := make(map[string]string)
+ for inx := 0; inx < len(labels); inx += 2 {
+ constLabels[labels[inx]] = labels[inx+1]
+ }
collector, ok := collectorContainer[rebuildName]
if !ok {
gauge := &Gauge{
@@ -36,7 +46,7 @@ func NewGauge(name, help string, getter func() float64,
labels ...string) *Gauge
gauge: prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: name,
Help: help,
- ConstLabels: rebuildLabels,
+ ConstLabels: constLabels,
}, getter),
}
Register(WithMeta(rebuildName, gauge.gauge))
@@ -46,13 +56,39 @@ func NewGauge(name, help string, getter func() float64,
labels ...string) *Gauge
return collector.(*Gauge)
}
-func rebuildGaugeNameAndLabels(name string, labels ...string) (gaugeName
string, labelsMap map[string]string) {
+func NewDynamicGauge(name, help string, labels ...string) *DynamicGauge {
+ lock.Lock()
+ defer lock.Unlock()
+ rebuildName := rebuildGaugeName(name, labels...)
+ collector, ok := collectorContainer[rebuildName]
+ if !ok {
+ gauge := &DynamicGauge{
+ name: name,
+ gauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Name: name,
+ Help: help,
+ }, labels),
+ }
+ Register(WithMeta(rebuildName, gauge.gauge))
+ collectorContainer[rebuildName] = gauge
+ collector = gauge
+ }
+ return collector.(*DynamicGauge)
+}
+
+func (i *DynamicGauge) Inc(labels ...string) {
+ i.gauge.WithLabelValues(labels...).Inc()
+}
+
+func (i *DynamicGauge) Dec(labels ...string) {
+ i.gauge.WithLabelValues(labels...).Dec()
+}
+
+func rebuildGaugeName(name string, labels ...string) string {
resultName := name
- resultLabels := make(map[string]string)
- for inx := 0; inx < len(labels); inx += 2 {
- resultName += "_" + labels[inx] + "_" + labels[inx+1]
- resultLabels[labels[inx]] = labels[inx+1]
+ for inx := 0; inx < len(labels); inx++ {
+ resultName += "_" + labels[inx]
}
- return resultName, resultLabels
+ return resultName
}
diff --git a/plugins/forwarder/grpc/envoyalsv2/forwarder.go
b/plugins/forwarder/grpc/envoyalsv2/forwarder.go
index a5636e5..082ebe1 100644
--- a/plugins/forwarder/grpc/envoyalsv2/forwarder.go
+++ b/plugins/forwarder/grpc/envoyalsv2/forwarder.go
@@ -31,6 +31,8 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/config"
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
+ "github.com/apache/skywalking-satellite/internal/satellite/telemetry"
+ server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
)
const (
@@ -41,6 +43,26 @@ const (
type Forwarder struct {
config.CommonFields
alsClient v2.AccessLogServiceClient
+
+ eventReadySendCount *telemetry.Counter
+ eventSendFinishedCount *telemetry.Counter
+ streamingReadySendCount *telemetry.Counter
+ streamingSendFinishedCount *telemetry.Counter
+
+ forwardConnectTime *telemetry.Timer
+ forwardSendTime *telemetry.Timer
+ forwardCloseTime *telemetry.Timer
+}
+
+func (f *Forwarder) init() {
+ f.eventReadySendCount = telemetry.NewCounter("als_event_ready_send",
"Total count of the ALS event ready send.")
+ f.eventSendFinishedCount =
telemetry.NewCounter("als_event_send_finished", "Total count of the ALS event
send finished.", "target")
+ f.streamingReadySendCount =
telemetry.NewCounter("als_streaming_ready_send", "Total count of the ALS
streaming ready send.")
+ f.streamingSendFinishedCount =
telemetry.NewCounter("als_streaming_send_finished", "Total count of the ALS
streaming send finished.", "target")
+
+ f.forwardConnectTime = telemetry.NewTimer("als_forward_connect_time",
"Total time of the open ALS streaming.")
+ f.forwardSendTime = telemetry.NewTimer("als_forward_send_time", "Total
time of the ALS send message.")
+ f.forwardCloseTime = telemetry.NewTimer("als_forward_close_time",
"Total time of the ALS streaming close.")
}
func (f *Forwarder) Name() string {
@@ -66,29 +88,51 @@ func (f *Forwarder) Prepare(connection interface{}) error {
f.Name(), reflect.TypeOf(connection).String())
}
f.alsClient = v2.NewAccessLogServiceClient(client)
+ f.init()
return nil
}
func (f *Forwarder) Forward(batch event.BatchEvents) error {
+ f.eventReadySendCount.Add(float64(len(batch)))
for _, e := range batch {
- data, ok := e.GetData().(*v1.SniffData_EnvoyALSV2List)
- if !ok {
- continue
- }
+ data, _ := e.GetData().(*v1.SniffData_EnvoyALSV2List)
+
f.streamingReadySendCount.Add(float64(len(data.EnvoyALSV2List.Messages)))
+ }
+
+ for _, e := range batch {
+ // open stream
+ timeRecord := f.forwardConnectTime.Start()
stream, err :=
f.alsClient.StreamAccessLogs(context.Background())
+ timeRecord.Stop()
if err != nil {
log.Logger.Errorf("open grpc stream error %v", err)
return err
}
- for _, message := range data.EnvoyALSV2List.Messages {
- err := stream.Send(message)
+ peer :=
server_grpc.GetPeerHostFromStreamContext(stream.Context())
+ timeRecord = f.forwardSendTime.Start()
+
+ data := e.GetEnvoyALSV2List()
+ if data == nil {
+ continue
+ }
+
+ // send message
+ for _, message := range data.Messages {
+ err :=
stream.SendMsg(server_grpc.NewOriginalData(message))
if err != nil {
log.Logger.Errorf("%s send envoy ALS v2 data
error: %v", f.Name(), err)
f.closeStream(stream)
return err
}
}
+
+ f.eventSendFinishedCount.Inc(peer)
+ timeRecord.Stop()
+
+ // close stream
+ timeRecord = f.forwardCloseTime.Start()
f.closeStream(stream)
+ timeRecord.Stop()
}
return nil
}
diff --git a/plugins/forwarder/grpc/envoyalsv3/forwarder.go
b/plugins/forwarder/grpc/envoyalsv3/forwarder.go
index 53245fb..1ffacc6 100644
--- a/plugins/forwarder/grpc/envoyalsv3/forwarder.go
+++ b/plugins/forwarder/grpc/envoyalsv3/forwarder.go
@@ -23,16 +23,16 @@ import (
"io"
"reflect"
- "github.com/apache/skywalking-satellite/internal/satellite/telemetry"
+ "google.golang.org/grpc"
v3 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v3"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
- "google.golang.org/grpc"
-
"github.com/apache/skywalking-satellite/internal/pkg/config"
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
+ "github.com/apache/skywalking-satellite/internal/satellite/telemetry"
+ server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
)
const (
@@ -46,8 +46,6 @@ type Forwarder struct {
eventReadySendCount *telemetry.Counter
eventSendFinishedCount *telemetry.Counter
- messageReadySendCount *telemetry.Counter
- messageSendFinishedCount *telemetry.Counter
streamingReadySendCount *telemetry.Counter
streamingSendFinishedCount *telemetry.Counter
@@ -58,11 +56,9 @@ type Forwarder struct {
func (f *Forwarder) init() {
f.eventReadySendCount = telemetry.NewCounter("als_event_ready_send",
"Total count of the ALS event ready send.")
- f.eventSendFinishedCount =
telemetry.NewCounter("als_event_send_finished", "Total count of the ALS event
send finished.")
- f.messageReadySendCount =
telemetry.NewCounter("als_message_ready_send", "Total count of the ALS message
ready send.")
- f.messageSendFinishedCount =
telemetry.NewCounter("als_message_send_finished", "Total count of the ALS event
sned finished.")
+ f.eventSendFinishedCount =
telemetry.NewCounter("als_event_send_finished", "Total count of the ALS event
send finished.", "target")
f.streamingReadySendCount =
telemetry.NewCounter("als_streaming_ready_send", "Total count of the ALS
streaming ready send.")
- f.streamingSendFinishedCount =
telemetry.NewCounter("als_streaming_send_finished", "Total count of the ALS
streaming send finished.")
+ f.streamingSendFinishedCount =
telemetry.NewCounter("als_streaming_send_finished", "Total count of the ALS
streaming send finished.", "target")
f.forwardConnectTime = telemetry.NewTimer("als_forward_connect_time",
"Total time of the open ALS streaming.")
f.forwardSendTime = telemetry.NewTimer("als_forward_send_time", "Total
time of the ALS send message.")
@@ -101,20 +97,9 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
for _, e := range batch {
data, _ := e.GetData().(*v1.SniffData_EnvoyALSV3List)
f.streamingReadySendCount.Add(float64(len(data.EnvoyALSV3List.Messages)))
- for _, message := range data.EnvoyALSV3List.Messages {
- if message.GetHttpLogs() != nil {
-
f.messageReadySendCount.Add(float64(len(message.GetHttpLogs().LogEntry)))
- } else if message.GetTcpLogs() != nil {
-
f.messageReadySendCount.Add(float64(len(message.GetTcpLogs().LogEntry)))
- }
- }
}
for _, e := range batch {
- data, ok := e.GetData().(*v1.SniffData_EnvoyALSV3List)
- if !ok {
- continue
- }
// open stream
timeRecord := f.forwardConnectTime.Start()
stream, err :=
f.alsClient.StreamAccessLogs(context.Background())
@@ -123,30 +108,31 @@ func (f *Forwarder) Forward(batch event.BatchEvents)
error {
log.Logger.Errorf("open grpc stream error %v", err)
return err
}
+ peer :=
server_grpc.GetPeerHostFromStreamContext(stream.Context())
+ timeRecord = f.forwardSendTime.Start()
+
+ data := e.GetEnvoyALSV3List()
+ if data == nil {
+ continue
+ }
// send message
- timeRecord = f.forwardSendTime.Start()
- for _, message := range data.EnvoyALSV3List.Messages {
- err := stream.Send(message)
+ for _, message := range data.Messages {
+ err :=
stream.SendMsg(server_grpc.NewOriginalData(message))
if err != nil {
log.Logger.Errorf("%s send envoy ALS v3 data
error: %v", f.Name(), err)
f.closeStream(stream)
return err
}
- if message.GetHttpLogs() != nil {
-
f.messageSendFinishedCount.Add(float64(len(message.GetHttpLogs().LogEntry)))
- } else if message.GetTcpLogs() != nil {
-
f.messageSendFinishedCount.Add(float64(len(message.GetTcpLogs().LogEntry)))
- }
}
+
+ f.eventSendFinishedCount.Inc(peer)
timeRecord.Stop()
// close stream
timeRecord = f.forwardCloseTime.Start()
f.closeStream(stream)
timeRecord.Stop()
- f.eventSendFinishedCount.Inc()
-
f.streamingSendFinishedCount.Add(float64(len(data.EnvoyALSV3List.Messages)))
}
return nil
}
diff --git a/plugins/queue/api/queue.go b/plugins/queue/api/queue.go
index a761423..f0f2def 100644
--- a/plugins/queue/api/queue.go
+++ b/plugins/queue/api/queue.go
@@ -58,7 +58,7 @@ type Queue interface {
// SequenceEvent is a wrapper to pass the event and the offset.
type SequenceEvent struct {
Event *v1.SniffData
- Offset *event.Offset
+ Offset event.Offset
}
// GetQueue an initialized filter plugin.
diff --git a/plugins/queue/memory/queue.go b/plugins/queue/memory/queue.go
index 629df83..e99615b 100644
--- a/plugins/queue/memory/queue.go
+++ b/plugins/queue/memory/queue.go
@@ -33,7 +33,7 @@ const (
ShowName = "Memory Queue"
)
-var DefaultOffset = &event.Offset{
+var DefaultOffset = event.Offset{
Position: "",
}
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
index 09ce199..e210c71 100644
--- a/plugins/queue/mmap/queue.go
+++ b/plugins/queue/mmap/queue.go
@@ -283,8 +283,8 @@ func (q *Queue) IsFull() bool {
}
// encode the meta to the offset
-func (q *Queue) encodeOffset(id, offset int64) *event.Offset {
- return &event.Offset{Position: strconv.FormatInt(id, 10) + "-" +
strconv.FormatInt(offset, 10)}
+func (q *Queue) encodeOffset(id, offset int64) event.Offset {
+ return event.Offset{Position: strconv.FormatInt(id, 10) + "-" +
strconv.FormatInt(offset, 10)}
}
// decode the offset to the meta of the mmap queue.
diff --git a/plugins/queue/mmap/queue_test.go b/plugins/queue/mmap/queue_test.go
index 59c33d2..ff1d2ee 100644
--- a/plugins/queue/mmap/queue_test.go
+++ b/plugins/queue/mmap/queue_test.go
@@ -222,7 +222,7 @@ func TestQueue_ReadHistory(t *testing.T) {
if err != nil {
t.Errorf("error in fetching data from queue:
%v", err)
} else if cmp.Equal(events[index].String(),
sequenceEvent.Event.String()) {
- q.Ack(sequenceEvent.Offset)
+ q.Ack(&sequenceEvent.Offset)
} else {
t.Errorf("history data and fetching data is not
equal\n,history:%+v\n. dequeue data:%+v\n", events[index], sequenceEvent.Event)
}
diff --git a/plugins/receiver/grpc/envoyalsv2/als_service.go
b/plugins/receiver/grpc/envoyalsv2/als_service.go
index 9df2a81..3f923df 100644
--- a/plugins/receiver/grpc/envoyalsv2/als_service.go
+++ b/plugins/receiver/grpc/envoyalsv2/als_service.go
@@ -19,42 +19,84 @@ package envoyalsv2
import (
"context"
+ "fmt"
"io"
"time"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+
"github.com/apache/skywalking-satellite/internal/satellite/module/buffer"
+ "github.com/apache/skywalking-satellite/internal/satellite/telemetry"
+ "github.com/apache/skywalking-satellite/plugins/server/grpc"
+
+ "google.golang.org/protobuf/proto"
v2 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v2"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
)
-const eventName = "grpc-envoy-als-v2-event"
+const (
+ eventName = "grpc-envoy-als-v2-event"
+ alsVersion = "v2"
+)
type AlsService struct {
receiveChannel chan *v1.SniffData
limiterConfig buffer.LimiterConfig
v2.UnimplementedAccessLogServiceServer
+
+ streamingCount *telemetry.Counter
+ streamingFailedCount *telemetry.Counter
+ streamingToEventCount *telemetry.Counter
+ activeStreamingCount *telemetry.DynamicGauge
+}
+
+func (m *AlsService) init() {
+ m.streamingCount = telemetry.NewCounter("als_streaming_receive_count",
+ "Total count of the receive stream in the ALS Receiver.",
"version", "peer_host")
+ m.streamingFailedCount =
telemetry.NewCounter("als_streaming_receive_failed_count",
+ "Total count of the failed receive message in the ALS
Receiver.", "version", "peer_host")
+ m.streamingToEventCount =
telemetry.NewCounter("als_streaming_to_event_count",
+ "Total count of the packaged the ALS streaming in the ALS
Receiver.", "version", "peer_host")
+ m.activeStreamingCount =
telemetry.NewDynamicGauge("als_streaming_active_count",
+ "Total active stream count in ALS Receiver", "version",
"peer_host")
}
func (m *AlsService) StreamAccessLogs(stream
v2.AccessLogService_StreamAccessLogsServer) error {
- messages := make(chan *v2.StreamAccessLogsMessage,
m.limiterConfig.LimitCount*2)
+ messages := make(chan []byte, m.limiterConfig.LimitCount*2)
limiter := buffer.NewLimiter(m.limiterConfig, func() int {
return len(messages)
})
+ peer := grpc.GetPeerHostFromStreamContext(stream.Context())
+ m.activeStreamingCount.Inc(alsVersion, peer)
+ defer m.activeStreamingCount.Dec(alsVersion, peer)
var identity *v2.StreamAccessLogsMessage_Identifier
defer limiter.Stop()
limiter.Start(context.Background(), func() {
+ if identity == nil {
+ return
+ }
count := len(messages)
if count == 0 {
return
}
- logsMessages := make([]*v2.StreamAccessLogsMessage, 0)
+ logsMessages := make([][]byte, 0)
for i := 0; i < count; i++ {
logsMessages = append(logsMessages, <-messages)
}
- logsMessages[0].Identifier = identity
+
+ // process first message identity
+ firstMessage := logsMessages[0]
+ firstAls := new(v2.StreamAccessLogsMessage)
+ if err := proto.Unmarshal(firstMessage, firstAls); err != nil {
+ log.Logger.Warnf("could not unmarshal als message: %v",
err)
+ return
+ }
+ firstAls.Identifier = identity
+ marshal, _ := proto.Marshal(firstAls)
+ logsMessages[0] = marshal
d := &v1.SniffData{
Name: eventName,
@@ -68,20 +110,33 @@ func (m *AlsService) StreamAccessLogs(stream
v2.AccessLogService_StreamAccessLog
},
},
}
+ m.streamingToEventCount.Inc(alsVersion, peer)
m.receiveChannel <- d
})
var err1 error
for {
- item, err := stream.Recv()
+ data := grpc.NewOriginalData(nil)
+ err := stream.RecvMsg(data)
if err != nil {
err1 = err
+ m.streamingFailedCount.Inc(alsVersion, peer)
break
}
- if item.Identifier != nil {
+ if identity == nil {
+ item := new(v2.StreamAccessLogsMessage)
+ err = proto.Unmarshal(data.Content, item)
+ if err != nil {
+ return fmt.Errorf("could not umarshal first
message, %v", err)
+ }
+ if item.Identifier == nil {
+ return fmt.Errorf("could not found identity in
message")
+ }
identity = item.Identifier
}
- messages <- item
+
+ m.streamingCount.Inc(alsVersion, peer)
+ messages <- data.Content
limiter.Check()
}
diff --git a/plugins/receiver/grpc/envoyalsv2/receiver.go
b/plugins/receiver/grpc/envoyalsv2/receiver.go
index c3bb019..4326968 100644
--- a/plugins/receiver/grpc/envoyalsv2/receiver.go
+++ b/plugins/receiver/grpc/envoyalsv2/receiver.go
@@ -67,6 +67,7 @@ limit_count: 500
func (r *Receiver) RegisterHandler(server interface{}) {
r.CommonGRPCReceiverFields = *grpc.InitCommonGRPCReceiverFields(server)
r.service = &AlsService{receiveChannel: r.OutputChannel, limiterConfig:
r.LimitConfig}
+ r.service.init()
v2.RegisterAccessLogServiceServer(r.Server, r.service)
}
diff --git a/plugins/receiver/grpc/envoyalsv2/receiver_test.go
b/plugins/receiver/grpc/envoyalsv2/receiver_test.go
index 89beb6a..c862fd4 100644
--- a/plugins/receiver/grpc/envoyalsv2/receiver_test.go
+++ b/plugins/receiver/grpc/envoyalsv2/receiver_test.go
@@ -22,6 +22,8 @@ import (
"strconv"
"testing"
+ "google.golang.org/protobuf/proto"
+
v2 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v2"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -50,7 +52,9 @@ func TestReceiver_RegisterHandler(t *testing.T) {
}
return data.String()
}, func(data *v1.SniffData) string {
- return data.GetEnvoyALSV2List().Messages[0].String()
+ m := new(v2.StreamAccessLogsMessage)
+ _ = proto.Unmarshal(data.GetEnvoyALSV2List().Messages[0], m)
+ return m.String()
}, t)
}
diff --git a/plugins/receiver/grpc/envoyalsv3/als_service.go
b/plugins/receiver/grpc/envoyalsv3/als_service.go
index ef061c1..bfc34f8 100644
--- a/plugins/receiver/grpc/envoyalsv3/als_service.go
+++ b/plugins/receiver/grpc/envoyalsv3/als_service.go
@@ -19,59 +19,84 @@ package envoyalsv3
import (
"context"
+ "fmt"
"io"
"time"
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+
+ "google.golang.org/protobuf/proto"
+
"github.com/apache/skywalking-satellite/internal/satellite/module/buffer"
"github.com/apache/skywalking-satellite/internal/satellite/telemetry"
+ "github.com/apache/skywalking-satellite/plugins/server/grpc"
v3 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v3"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
)
-const eventName = "grpc-envoy-als-v3-event"
+const (
+ eventName = "grpc-envoy-als-v3-event"
+ alsVersion = "v3"
+)
type AlsService struct {
receiveChannel chan *v1.SniffData
limiterConfig buffer.LimiterConfig
v3.UnimplementedAccessLogServiceServer
- messageCount *telemetry.Counter
streamingCount *telemetry.Counter
streamingFailedCount *telemetry.Counter
streamingToEventCount *telemetry.Counter
+ activeStreamingCount *telemetry.DynamicGauge
}
func (m *AlsService) init() {
- m.messageCount = telemetry.NewCounter("als_message_receive_count",
- "Total count of the receive message in the ALS Receiver.")
m.streamingCount = telemetry.NewCounter("als_streaming_receive_count",
- "Total count of the receive stream in the ALS Receiver.")
+ "Total count of the receive stream in the ALS Receiver.",
"version", "peer_host")
m.streamingFailedCount =
telemetry.NewCounter("als_streaming_receive_failed_count",
- "Total count of the failed receive message in the ALS
Receiver.")
+ "Total count of the failed receive message in the ALS
Receiver.", "version", "peer_host")
m.streamingToEventCount =
telemetry.NewCounter("als_streaming_to_event_count",
- "Total count of the packaged the ALS streaming in the ALS
Receiver.")
+ "Total count of the packaged the ALS streaming in the ALS
Receiver.", "version", "peer_host")
+ m.activeStreamingCount =
telemetry.NewDynamicGauge("als_streaming_active_count",
+ "Total active stream count in ALS Receiver", "version",
"peer_host")
}
func (m *AlsService) StreamAccessLogs(stream
v3.AccessLogService_StreamAccessLogsServer) error {
- messages := make(chan *v3.StreamAccessLogsMessage,
m.limiterConfig.LimitCount*2)
+ messages := make(chan []byte, m.limiterConfig.LimitCount*2)
limiter := buffer.NewLimiter(m.limiterConfig, func() int {
return len(messages)
})
+ peer := grpc.GetPeerHostFromStreamContext(stream.Context())
+ m.activeStreamingCount.Inc(alsVersion, peer)
+ defer m.activeStreamingCount.Dec(alsVersion, peer)
var identity *v3.StreamAccessLogsMessage_Identifier
defer limiter.Stop()
limiter.Start(context.Background(), func() {
+ if identity == nil {
+ return
+ }
count := len(messages)
if count == 0 {
return
}
- logsMessages := make([]*v3.StreamAccessLogsMessage, 0)
+ logsMessages := make([][]byte, 0)
for i := 0; i < count; i++ {
logsMessages = append(logsMessages, <-messages)
}
- logsMessages[0].Identifier = identity
+
+ // process first message identity
+ firstMessage := logsMessages[0]
+ firstAls := new(v3.StreamAccessLogsMessage)
+ if err := proto.Unmarshal(firstMessage, firstAls); err != nil {
+ log.Logger.Warnf("could not unmarshal als message: %v",
err)
+ return
+ }
+ firstAls.Identifier = identity
+ marshal, _ := proto.Marshal(firstAls)
+ logsMessages[0] = marshal
d := &v1.SniffData{
Name: eventName,
@@ -85,28 +110,33 @@ func (m *AlsService) StreamAccessLogs(stream
v3.AccessLogService_StreamAccessLog
},
},
}
- m.streamingToEventCount.Inc()
+ m.streamingToEventCount.Inc(alsVersion, peer)
m.receiveChannel <- d
})
var err1 error
for {
- item, err := stream.Recv()
+ data := grpc.NewOriginalData(nil)
+ err := stream.RecvMsg(data)
if err != nil {
- m.streamingFailedCount.Inc()
+ m.streamingFailedCount.Inc(alsVersion, peer)
err1 = err
break
}
- if item.Identifier != nil {
+ if identity == nil {
+ item := new(v3.StreamAccessLogsMessage)
+ err = proto.Unmarshal(data.Content, item)
+ if err != nil {
+ return fmt.Errorf("could not umarshal first
message, %v", err)
+ }
+ if item.Identifier == nil {
+ return fmt.Errorf("could not found identity in
message")
+ }
identity = item.Identifier
}
- m.streamingCount.Inc()
- if item.GetHttpLogs() != nil {
-
m.messageCount.Add(float64(len(item.GetHttpLogs().LogEntry)))
- } else if item.GetTcpLogs() != nil {
-
m.messageCount.Add(float64(len(item.GetTcpLogs().LogEntry)))
- }
- messages <- item
+
+ m.streamingCount.Inc(alsVersion, peer)
+ messages <- data.Content
limiter.Check()
}
diff --git a/plugins/receiver/grpc/envoyalsv3/receiver_test.go
b/plugins/receiver/grpc/envoyalsv3/receiver_test.go
index 12c8ace..978dba8 100644
--- a/plugins/receiver/grpc/envoyalsv3/receiver_test.go
+++ b/plugins/receiver/grpc/envoyalsv3/receiver_test.go
@@ -22,6 +22,8 @@ import (
"strconv"
"testing"
+ "google.golang.org/protobuf/proto"
+
"google.golang.org/grpc"
v3 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v3"
@@ -50,7 +52,9 @@ func TestReceiver_RegisterHandler(t *testing.T) {
}
return data.String()
}, func(data *v1.SniffData) string {
- return data.GetEnvoyALSV3List().Messages[0].String()
+ m := new(v3.StreamAccessLogsMessage)
+ _ = proto.Unmarshal(data.GetEnvoyALSV3List().Messages[0], m)
+ return m.String()
}, t)
}
diff --git a/plugins/server/grpc/server_codec.go
b/plugins/server/grpc/server_codec.go
new file mode 100644
index 0000000..3602496
--- /dev/null
+++ b/plugins/server/grpc/server_codec.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "fmt"
+
+ "google.golang.org/grpc/encoding"
+ "google.golang.org/protobuf/proto"
+)
+
+func init() {
+ encoding.RegisterCodec(codec{})
+}
+
+// OriginalData is keep binary Content
+type OriginalData struct {
+ Content []byte
+}
+
+func NewOriginalData(data []byte) *OriginalData {
+ return &OriginalData{Content: data}
+}
+
+// codec is overwritten the original "proto" codec, and support using
OriginalData to skip data en/decoding.
+type codec struct{}
+
+func (codec) Marshal(v interface{}) ([]byte, error) {
+ vv, ok := v.(proto.Message)
+ if !ok {
+ original, ok := v.(*OriginalData)
+ if !ok {
+ return nil, fmt.Errorf("failed to marshal, message is
%T, want proto.Message or grpc.OriginalData", v)
+ }
+ return original.Content, nil
+ }
+ return proto.Marshal(vv)
+}
+
+func (codec) Unmarshal(data []byte, v interface{}) error {
+ vv, ok := v.(proto.Message)
+ if !ok {
+ original, ok := v.(*OriginalData)
+ if !ok {
+ return fmt.Errorf("failed to unmarshal, message is %T,
want proto.Message or grpc.OriginalData", v)
+ }
+ original.Content = data
+ return nil
+ }
+ return proto.Unmarshal(data, vv)
+}
+
+func (codec) Name() string {
+ return "proto"
+}
diff --git a/plugins/server/grpc/server_telementry.go
b/plugins/server/grpc/server_telementry.go
new file mode 100644
index 0000000..9078c0e
--- /dev/null
+++ b/plugins/server/grpc/server_telementry.go
@@ -0,0 +1,37 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "context"
+ "strings"
+
+ "google.golang.org/grpc/peer"
+)
+
+func GetPeerHostFromStreamContext(ctx context.Context) string {
+ peerAddr, ok := peer.FromContext(ctx)
+ peerStr := ""
+ if ok {
+ peerStr = peerAddr.Addr.String()
+ if inx := strings.IndexByte(peerStr, ':'); inx > 0 {
+ peerStr = peerStr[:strings.IndexByte(peerStr, ':')]
+ }
+ }
+ return peerStr
+}