This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b97fd2  Use byte array to transmit the native tracing segment and log 
(#85)
4b97fd2 is described below

commit 4b97fd255b6459fe9d3fc4f88c0d8f682fc2a364
Author: mrproliu <[email protected]>
AuthorDate: Tue Nov 23 11:43:09 2021 +0800

    Use byte array to transmit the native tracing segment and log (#85)
---
 CHANGES.md                                         |   2 +-
 go.mod                                             |   2 +-
 go.sum                                             |   4 +-
 plugins/forwarder/grpc/nativelog/forwarder.go      |   3 +-
 plugins/forwarder/grpc/nativetracing/forwarder.go  |   3 +-
 plugins/forwarder/kafka/nativelog/forwarder.go     |  10 +--
 plugins/queue/mmap/queue_test.go                   | 100 +++++++++++----------
 .../receiver/grpc/nativelog/log_report_service.go  |   7 +-
 plugins/receiver/grpc/nativelog/receiver_test.go   |   6 +-
 .../receiver/grpc/nativetracing/receiver_test.go   |  10 ++-
 .../grpc/nativetracing/tracing_report_service.go   |  16 +++-
 plugins/receiver/http/nativcelog/receiver.go       |   2 +-
 plugins/receiver/http/nativcelog/receiver_test.go  |   4 +-
 13 files changed, 97 insertions(+), 72 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 2374a83..e5a8b4a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,7 +6,7 @@ Release Notes.
 ------------------
 #### Features
 * Support partition queue.
-* Using byte array to transmit the ALS streaming, reducing en/decoding cpu 
usage.
+* Using byte array to transmit the ALS streaming, Native tracing segment and 
log, reducing en/decoding cpu usage.
 
 #### Bug Fixes
 
diff --git a/go.mod b/go.mod
index fce1276..df3d948 100644
--- a/go.mod
+++ b/go.mod
@@ -23,5 +23,5 @@ require (
        google.golang.org/protobuf v1.27.1
        gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
        gotest.tools v2.2.0+incompatible
-       skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896
+       skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21
 )
diff --git a/go.sum b/go.sum
index ee7219f..8bff2d3 100644
--- a/go.sum
+++ b/go.sum
@@ -1330,6 +1330,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod 
h1:bJZC9H9iH24zzfZ/41RGcq60oK
 sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896 
h1:Om3XZ0eizMIiQI56K9ukO2T4+xgv3g4tRyFQic1gkBk=
-skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896/go.mod 
h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
+skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21 
h1:USC28w3toXoRiNzSCN3lLgnmT8l6RokW7++GiXcNMCU=
+skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21/go.mod 
h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
 sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod 
h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/plugins/forwarder/grpc/nativelog/forwarder.go 
b/plugins/forwarder/grpc/nativelog/forwarder.go
index 8343f20..9148a6d 100644
--- a/plugins/forwarder/grpc/nativelog/forwarder.go
+++ b/plugins/forwarder/grpc/nativelog/forwarder.go
@@ -31,6 +31,7 @@ import (
        "github.com/apache/skywalking-satellite/internal/pkg/config"
        "github.com/apache/skywalking-satellite/internal/pkg/log"
        "github.com/apache/skywalking-satellite/internal/satellite/event"
+       server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
 )
 
 const (
@@ -81,7 +82,7 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
                if !ok {
                        continue
                }
-               err := stream.Send(data.Log)
+               err := stream.SendMsg(server_grpc.NewOriginalData(data.Log))
                if err != nil {
                        log.Logger.Errorf("%s send log data error: %v", 
f.Name(), err)
                        err = closeStream(stream)
diff --git a/plugins/forwarder/grpc/nativetracing/forwarder.go 
b/plugins/forwarder/grpc/nativetracing/forwarder.go
index 9087a3c..1283b32 100644
--- a/plugins/forwarder/grpc/nativetracing/forwarder.go
+++ b/plugins/forwarder/grpc/nativetracing/forwarder.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/skywalking-satellite/internal/pkg/config"
        "github.com/apache/skywalking-satellite/internal/pkg/log"
        "github.com/apache/skywalking-satellite/internal/satellite/event"
+       server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
 
        agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
        v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -81,7 +82,7 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
                if !ok {
                        continue
                }
-               err := stream.Send(data.Segment)
+               err := stream.SendMsg(server_grpc.NewOriginalData(data.Segment))
                if err != nil {
                        log.Logger.Errorf("%s send log data error: %v", 
f.Name(), err)
                        err = closeStream(stream)
diff --git a/plugins/forwarder/kafka/nativelog/forwarder.go 
b/plugins/forwarder/kafka/nativelog/forwarder.go
index 7e652e2..06d0046 100644
--- a/plugins/forwarder/kafka/nativelog/forwarder.go
+++ b/plugins/forwarder/kafka/nativelog/forwarder.go
@@ -21,12 +21,9 @@ import (
        "fmt"
        "reflect"
 
-       "google.golang.org/protobuf/proto"
-
        "github.com/Shopify/sarama"
 
        "github.com/apache/skywalking-satellite/internal/pkg/config"
-       "github.com/apache/skywalking-satellite/internal/pkg/log"
        "github.com/apache/skywalking-satellite/internal/satellite/event"
 
        v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -83,14 +80,9 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
                if !ok {
                        continue
                }
-               bytes, err := proto.Marshal(data.Log)
-               if err != nil {
-                       log.Logger.Errorf("%s serialize the logData fail: %v", 
f.Name(), err)
-                       continue
-               }
                message = append(message, &sarama.ProducerMessage{
                        Topic: f.Topic,
-                       Value: sarama.ByteEncoder(bytes),
+                       Value: sarama.ByteEncoder(data.Log),
                })
        }
        return f.producer.SendMessages(message)
diff --git a/plugins/queue/mmap/queue_test.go b/plugins/queue/mmap/queue_test.go
index ff1d2ee..4964421 100644
--- a/plugins/queue/mmap/queue_test.go
+++ b/plugins/queue/mmap/queue_test.go
@@ -28,6 +28,8 @@ import (
        "testing"
        "time"
 
+       "google.golang.org/protobuf/proto"
+
        common "skywalking.apache.org/repo/goapi/collect/common/v3"
        logging "skywalking.apache.org/repo/goapi/collect/logging/v3"
        v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -68,6 +70,27 @@ func cleanTestQueue(t *testing.T, q api.Queue) {
 func getBatchEvents(count int) []*v1.SniffData {
        var slice []*v1.SniffData
        for i := 0; i < count; i++ {
+               log := &logging.LogData{
+                       Service:         "mock-service",
+                       ServiceInstance: "mock-serviceInstance",
+                       Timestamp:       time.Date(2020, 12, 20, 12, 12, 12, 0, 
time.UTC).Unix(),
+                       Endpoint:        "mock-endpoint",
+                       Tags:            &logging.LogTags{},
+                       TraceContext: &logging.TraceContext{
+                               TraceId:        "traceId",
+                               TraceSegmentId: "trace-segmentId",
+                               SpanId:         12,
+                       },
+                       Body: &logging.LogDataBody{
+                               Type: "body-type",
+                               Content: &logging.LogDataBody_Text{
+                                       Text: &logging.TextLog{
+                                               Text: getNKData(2) + 
strconv.Itoa(i),
+                                       },
+                               },
+                       },
+               }
+               logBytes, _ := proto.Marshal(log)
                slice = append(slice, &v1.SniffData{
                        Name:      "event" + strconv.Itoa(i),
                        Timestamp: time.Now().Unix(),
@@ -77,26 +100,7 @@ func getBatchEvents(count int) []*v1.SniffData {
                        Type:   v1.SniffType_Logging,
                        Remote: true,
                        Data: &v1.SniffData_Log{
-                               Log: &logging.LogData{
-                                       Service:         "mock-service",
-                                       ServiceInstance: "mock-serviceInstance",
-                                       Timestamp:       time.Date(2020, 12, 
20, 12, 12, 12, 0, time.UTC).Unix(),
-                                       Endpoint:        "mock-endpoint",
-                                       Tags:            &logging.LogTags{},
-                                       TraceContext: &logging.TraceContext{
-                                               TraceId:        "traceId",
-                                               TraceSegmentId: 
"trace-segmentId",
-                                               SpanId:         12,
-                                       },
-                                       Body: &logging.LogDataBody{
-                                               Type: "body-type",
-                                               Content: 
&logging.LogDataBody_Text{
-                                                       Text: &logging.TextLog{
-                                                               Text: 
getNKData(2) + strconv.Itoa(i),
-                                                       },
-                                               },
-                                       },
-                               },
+                               Log: logBytes,
                        },
                },
                )
@@ -109,6 +113,34 @@ func getNKData(n int) string {
 }
 
 func getLargeEvent(n int) *v1.SniffData {
+       log := &logging.LogData{
+               Service:         "mock-service",
+               ServiceInstance: "mock-serviceInstance",
+               Timestamp:       time.Date(2020, 12, 20, 12, 12, 12, 0, 
time.UTC).Unix(),
+               Endpoint:        "mock-endpoint",
+               Tags: &logging.LogTags{
+                       Data: []*common.KeyStringValuePair{
+                               {
+                                       Key:   "tags-key",
+                                       Value: "tags-val",
+                               },
+                       },
+               },
+               TraceContext: &logging.TraceContext{
+                       TraceId:        "traceId",
+                       TraceSegmentId: "trace-segmentId",
+                       SpanId:         12,
+               },
+               Body: &logging.LogDataBody{
+                       Type: "body-type",
+                       Content: &logging.LogDataBody_Text{
+                               Text: &logging.TextLog{
+                                       Text: getNKData(n),
+                               },
+                       },
+               },
+       }
+       logBytes, _ := proto.Marshal(log)
        return &v1.SniffData{
                Name:      "largeEvent",
                Timestamp: time.Now().Unix(),
@@ -118,33 +150,7 @@ func getLargeEvent(n int) *v1.SniffData {
                Type:   v1.SniffType_Logging,
                Remote: true,
                Data: &v1.SniffData_Log{
-                       Log: &logging.LogData{
-                               Service:         "mock-service",
-                               ServiceInstance: "mock-serviceInstance",
-                               Timestamp:       time.Date(2020, 12, 20, 12, 
12, 12, 0, time.UTC).Unix(),
-                               Endpoint:        "mock-endpoint",
-                               Tags: &logging.LogTags{
-                                       Data: []*common.KeyStringValuePair{
-                                               {
-                                                       Key:   "tags-key",
-                                                       Value: "tags-val",
-                                               },
-                                       },
-                               },
-                               TraceContext: &logging.TraceContext{
-                                       TraceId:        "traceId",
-                                       TraceSegmentId: "trace-segmentId",
-                                       SpanId:         12,
-                               },
-                               Body: &logging.LogDataBody{
-                                       Type: "body-type",
-                                       Content: &logging.LogDataBody_Text{
-                                               Text: &logging.TextLog{
-                                                       Text: getNKData(n),
-                                               },
-                                       },
-                               },
-                       },
+                       Log: logBytes,
                },
        }
 }
diff --git a/plugins/receiver/grpc/nativelog/log_report_service.go 
b/plugins/receiver/grpc/nativelog/log_report_service.go
index a291956..6670f81 100644
--- a/plugins/receiver/grpc/nativelog/log_report_service.go
+++ b/plugins/receiver/grpc/nativelog/log_report_service.go
@@ -21,6 +21,8 @@ import (
        "io"
        "time"
 
+       "github.com/apache/skywalking-satellite/plugins/server/grpc"
+
        common "skywalking.apache.org/repo/goapi/collect/common/v3"
        logging "skywalking.apache.org/repo/goapi/collect/logging/v3"
        v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -35,7 +37,8 @@ type LogReportService struct {
 
 func (s *LogReportService) Collect(stream 
logging.LogReportService_CollectServer) error {
        for {
-               logData, err := stream.Recv()
+               originalData := grpc.NewOriginalData(nil)
+               err := stream.RecvMsg(originalData)
                if err == io.EOF {
                        return stream.SendAndClose(&common.Commands{})
                }
@@ -49,7 +52,7 @@ func (s *LogReportService) Collect(stream 
logging.LogReportService_CollectServer
                        Type:      v1.SniffType_Logging,
                        Remote:    true,
                        Data: &v1.SniffData_Log{
-                               Log: logData,
+                               Log: originalData.Content,
                        },
                }
                s.receiveChannel <- e
diff --git a/plugins/receiver/grpc/nativelog/receiver_test.go 
b/plugins/receiver/grpc/nativelog/receiver_test.go
index 0f3734b..97241e8 100644
--- a/plugins/receiver/grpc/nativelog/receiver_test.go
+++ b/plugins/receiver/grpc/nativelog/receiver_test.go
@@ -23,6 +23,8 @@ import (
        "testing"
        "time"
 
+       "google.golang.org/protobuf/proto"
+
        "google.golang.org/grpc"
 
        common "skywalking.apache.org/repo/goapi/collect/common/v3"
@@ -49,7 +51,9 @@ func TestReceiver_RegisterHandler(t *testing.T) {
                }
                return data.String()
        }, func(data *v1.SniffData) string {
-               return data.GetLog().String()
+               d := new(logging.LogData)
+               _ = proto.Unmarshal(data.GetLog(), d)
+               return d.String()
        }, t)
 }
 
diff --git a/plugins/receiver/grpc/nativetracing/receiver_test.go 
b/plugins/receiver/grpc/nativetracing/receiver_test.go
index 97514be..227bbcd 100644
--- a/plugins/receiver/grpc/nativetracing/receiver_test.go
+++ b/plugins/receiver/grpc/nativetracing/receiver_test.go
@@ -23,6 +23,8 @@ import (
        "testing"
        "time"
 
+       "google.golang.org/protobuf/proto"
+
        "google.golang.org/grpc"
 
        common "skywalking.apache.org/repo/goapi/collect/common/v3"
@@ -49,7 +51,9 @@ func TestReceiver_RegisterHandlerStream(t *testing.T) {
                }
                return data.String()
        }, func(data *v1.SniffData) string {
-               return data.GetSegment().String()
+               d := new(agent.SegmentObject)
+               _ = proto.Unmarshal(data.GetSegment(), d)
+               return d.String()
        }, t)
 }
 
@@ -68,7 +72,9 @@ func TestReceiver_RegisterHandlerSync(t *testing.T) {
                }
                return data.String()
        }, func(data *v1.SniffData) string {
-               return data.GetSegment().String()
+               d := new(agent.SegmentObject)
+               _ = proto.Unmarshal(data.GetSegment(), d)
+               return d.String()
        }, t)
 }
 
diff --git a/plugins/receiver/grpc/nativetracing/tracing_report_service.go 
b/plugins/receiver/grpc/nativetracing/tracing_report_service.go
index 2a58f5f..9929fd8 100644
--- a/plugins/receiver/grpc/nativetracing/tracing_report_service.go
+++ b/plugins/receiver/grpc/nativetracing/tracing_report_service.go
@@ -22,6 +22,11 @@ import (
        "io"
        "time"
 
+       "google.golang.org/protobuf/proto"
+
+       "github.com/apache/skywalking-satellite/internal/pkg/log"
+       "github.com/apache/skywalking-satellite/plugins/server/grpc"
+
        common "skywalking.apache.org/repo/goapi/collect/common/v3"
        agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
        v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -36,7 +41,8 @@ type TraceSegmentReportService struct {
 
 func (s *TraceSegmentReportService) Collect(stream 
agent.TraceSegmentReportService_CollectServer) error {
        for {
-               segmentData, err := stream.Recv()
+               recData := grpc.NewOriginalData(nil)
+               err := stream.RecvMsg(recData)
                if err == io.EOF {
                        return stream.SendAndClose(&common.Commands{})
                }
@@ -50,7 +56,7 @@ func (s *TraceSegmentReportService) Collect(stream 
agent.TraceSegmentReportServi
                        Type:      v1.SniffType_TracingType,
                        Remote:    true,
                        Data: &v1.SniffData_Segment{
-                               Segment: segmentData,
+                               Segment: recData.Content,
                        },
                }
                s.receiveChannel <- e
@@ -59,6 +65,10 @@ func (s *TraceSegmentReportService) Collect(stream 
agent.TraceSegmentReportServi
 
 func (s *TraceSegmentReportService) CollectInSync(ctx context.Context, 
segments *agent.SegmentCollection) (*common.Commands, error) {
        for _, segment := range segments.Segments {
+               marshaledSegment, err := proto.Marshal(segment)
+               if err != nil {
+                       log.Logger.Warnf("cannot marshal segemnt from sync, 
%v", err)
+               }
                e := &v1.SniffData{
                        Name:      eventName,
                        Timestamp: time.Now().UnixNano() / 1e6,
@@ -66,7 +76,7 @@ func (s *TraceSegmentReportService) CollectInSync(ctx 
context.Context, segments
                        Type:      v1.SniffType_TracingType,
                        Remote:    true,
                        Data: &v1.SniffData_Segment{
-                               Segment: segment,
+                               Segment: marshaledSegment,
                        },
                }
                s.receiveChannel <- e
diff --git a/plugins/receiver/http/nativcelog/receiver.go 
b/plugins/receiver/http/nativcelog/receiver.go
index e30949a..cf1d460 100644
--- a/plugins/receiver/http/nativcelog/receiver.go
+++ b/plugins/receiver/http/nativcelog/receiver.go
@@ -121,7 +121,7 @@ func (r *Receiver) httpHandler() http.Handler {
                        Type:      v1.SniffType_Logging,
                        Remote:    true,
                        Data: &v1.SniffData_Log{
-                               Log: &data,
+                               Log: b,
                        },
                }
                r.OutputChannel <- e
diff --git a/plugins/receiver/http/nativcelog/receiver_test.go 
b/plugins/receiver/http/nativcelog/receiver_test.go
index 2d308cc..5cfb1cf 100644
--- a/plugins/receiver/http/nativcelog/receiver_test.go
+++ b/plugins/receiver/http/nativcelog/receiver_test.go
@@ -83,7 +83,9 @@ func TestReceiver_http_RegisterHandler(t *testing.T) {
                }()
 
                newData := <-r.Channel()
-               if !cmp.Equal(newData.Data.(*v1.SniffData_Log).Log.String(), 
data.String()) {
+               d := new(logging.LogData)
+               _ = proto.Unmarshal(newData.Data.(*v1.SniffData_Log).Log, d)
+               if !cmp.Equal(d.String(), data.String()) {
                        t.Fatalf("the sent data is not equal to the received 
data\n, "+
                                "want data %s\n, but got %s\n", data.String(), 
newData.String())
                }

Reply via email to