This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new abc4f0c Support the new als protocol (#88)
abc4f0c is described below
commit abc4f0c0f94f22a0bc0214b5225f254710c4fd0e
Author: mrproliu <[email protected]>
AuthorDate: Tue Nov 30 12:23:46 2021 +0800
Support the new als protocol (#88)
---
go.mod | 2 +-
go.sum | 4 +-
plugins/forwarder/grpc/envoyalsv2/forwarder.go | 43 +++++++++----------
plugins/forwarder/grpc/envoyalsv3/forwarder.go | 43 +++++++++----------
test/e2e/base/env | 6 +--
test/e2e/case/istio/als/e2e.yaml | 8 ++--
test/e2e/case/istio/metrics/e2e.yaml | 8 ++--
test/e2e/case/istio/satellite.yaml | 57 --------------------------
8 files changed, 58 insertions(+), 113 deletions(-)
diff --git a/go.mod b/go.mod
index df3d948..92baf98 100644
--- a/go.mod
+++ b/go.mod
@@ -23,5 +23,5 @@ require (
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
gotest.tools v2.2.0+incompatible
- skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21
+ skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68
)
diff --git a/go.sum b/go.sum
index 8bff2d3..28f5755 100644
--- a/go.sum
+++ b/go.sum
@@ -1330,6 +1330,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod
h1:bJZC9H9iH24zzfZ/41RGcq60oK
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21
h1:USC28w3toXoRiNzSCN3lLgnmT8l6RokW7++GiXcNMCU=
-skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21/go.mod
h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
+skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68
h1:DE5enrtUAh/PViRIJCYFLMBjmvPLJVmXEyHJZjBtK90=
+skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68/go.mod
h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod
h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/plugins/forwarder/grpc/envoyalsv2/forwarder.go
b/plugins/forwarder/grpc/envoyalsv2/forwarder.go
index 082ebe1..91020a4 100644
--- a/plugins/forwarder/grpc/envoyalsv2/forwarder.go
+++ b/plugins/forwarder/grpc/envoyalsv2/forwarder.go
@@ -23,8 +23,8 @@ import (
"io"
"reflect"
- v2 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v2"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+ v3 "skywalking.apache.org/repo/goapi/satellite/envoy/accesslog/v3"
"google.golang.org/grpc"
@@ -42,7 +42,7 @@ const (
type Forwarder struct {
config.CommonFields
- alsClient v2.AccessLogServiceClient
+ alsClient v3.SatelliteAccessLogServiceClient
eventReadySendCount *telemetry.Counter
eventSendFinishedCount *telemetry.Counter
@@ -87,7 +87,7 @@ func (f *Forwarder) Prepare(connection interface{}) error {
return fmt.Errorf("the %s only accepts a grpc client, but
received a %s",
f.Name(), reflect.TypeOf(connection).String())
}
- f.alsClient = v2.NewAccessLogServiceClient(client)
+ f.alsClient = v3.NewSatelliteAccessLogServiceClient(client)
f.init()
return nil
}
@@ -99,18 +99,18 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
f.streamingReadySendCount.Add(float64(len(data.EnvoyALSV2List.Messages)))
}
- for _, e := range batch {
- // open stream
- timeRecord := f.forwardConnectTime.Start()
- stream, err :=
f.alsClient.StreamAccessLogs(context.Background())
- timeRecord.Stop()
- if err != nil {
- log.Logger.Errorf("open grpc stream error %v", err)
- return err
- }
- peer :=
server_grpc.GetPeerHostFromStreamContext(stream.Context())
- timeRecord = f.forwardSendTime.Start()
+ // open stream
+ timeRecord := f.forwardConnectTime.Start()
+ stream, err := f.alsClient.StreamAccessLogs(context.Background())
+ timeRecord.Stop()
+ if err != nil {
+ log.Logger.Errorf("open grpc stream error %v", err)
+ return err
+ }
+ peer := server_grpc.GetPeerHostFromStreamContext(stream.Context())
+ timeRecord = f.forwardSendTime.Start()
+ for _, e := range batch {
data := e.GetEnvoyALSV2List()
if data == nil {
continue
@@ -127,17 +127,18 @@ func (f *Forwarder) Forward(batch event.BatchEvents)
error {
}
f.eventSendFinishedCount.Inc(peer)
- timeRecord.Stop()
-
- // close stream
- timeRecord = f.forwardCloseTime.Start()
- f.closeStream(stream)
- timeRecord.Stop()
}
+
+ timeRecord.Stop()
+
+ // close stream
+ timeRecord = f.forwardCloseTime.Start()
+ f.closeStream(stream)
+ timeRecord.Stop()
return nil
}
-func (f *Forwarder) closeStream(stream
v2.AccessLogService_StreamAccessLogsClient) {
+func (f *Forwarder) closeStream(stream
v3.SatelliteAccessLogService_StreamAccessLogsClient) {
_, err := stream.CloseAndRecv()
if err != nil && err != io.EOF {
log.Logger.Warnf("%s close stream error: %v", f.Name(), err)
diff --git a/plugins/forwarder/grpc/envoyalsv3/forwarder.go
b/plugins/forwarder/grpc/envoyalsv3/forwarder.go
index 1ffacc6..a727136 100644
--- a/plugins/forwarder/grpc/envoyalsv3/forwarder.go
+++ b/plugins/forwarder/grpc/envoyalsv3/forwarder.go
@@ -25,8 +25,8 @@ import (
"google.golang.org/grpc"
- v3 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v3"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+ v3 "skywalking.apache.org/repo/goapi/satellite/envoy/accesslog/v3"
"github.com/apache/skywalking-satellite/internal/pkg/config"
"github.com/apache/skywalking-satellite/internal/pkg/log"
@@ -42,7 +42,7 @@ const (
type Forwarder struct {
config.CommonFields
- alsClient v3.AccessLogServiceClient
+ alsClient v3.SatelliteAccessLogServiceClient
eventReadySendCount *telemetry.Counter
eventSendFinishedCount *telemetry.Counter
@@ -87,7 +87,7 @@ func (f *Forwarder) Prepare(connection interface{}) error {
return fmt.Errorf("the %s only accepts a grpc client, but
received a %s",
f.Name(), reflect.TypeOf(connection).String())
}
- f.alsClient = v3.NewAccessLogServiceClient(client)
+ f.alsClient = v3.NewSatelliteAccessLogServiceClient(client)
f.init()
return nil
}
@@ -99,18 +99,18 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
f.streamingReadySendCount.Add(float64(len(data.EnvoyALSV3List.Messages)))
}
- for _, e := range batch {
- // open stream
- timeRecord := f.forwardConnectTime.Start()
- stream, err :=
f.alsClient.StreamAccessLogs(context.Background())
- timeRecord.Stop()
- if err != nil {
- log.Logger.Errorf("open grpc stream error %v", err)
- return err
- }
- peer :=
server_grpc.GetPeerHostFromStreamContext(stream.Context())
- timeRecord = f.forwardSendTime.Start()
+ // open stream
+ timeRecord := f.forwardConnectTime.Start()
+ stream, err := f.alsClient.StreamAccessLogs(context.Background())
+ timeRecord.Stop()
+ if err != nil {
+ log.Logger.Errorf("open grpc stream error %v", err)
+ return err
+ }
+ peer := server_grpc.GetPeerHostFromStreamContext(stream.Context())
+ timeRecord = f.forwardSendTime.Start()
+ for _, e := range batch {
data := e.GetEnvoyALSV3List()
if data == nil {
continue
@@ -127,17 +127,18 @@ func (f *Forwarder) Forward(batch event.BatchEvents)
error {
}
f.eventSendFinishedCount.Inc(peer)
- timeRecord.Stop()
-
- // close stream
- timeRecord = f.forwardCloseTime.Start()
- f.closeStream(stream)
- timeRecord.Stop()
}
+
+ timeRecord.Stop()
+
+ // close stream
+ timeRecord = f.forwardCloseTime.Start()
+ f.closeStream(stream)
+ timeRecord.Stop()
return nil
}
-func (f *Forwarder) closeStream(stream
v3.AccessLogService_StreamAccessLogsClient) {
+func (f *Forwarder) closeStream(stream
v3.SatelliteAccessLogService_StreamAccessLogsClient) {
_, err := stream.CloseAndRecv()
if err != nil && err != io.EOF {
log.Logger.Warnf("%s close stream error: %v", f.Name(), err)
diff --git a/test/e2e/base/env b/test/e2e/base/env
index 77e2439..bb7d3a9 100644
--- a/test/e2e/base/env
+++ b/test/e2e/base/env
@@ -14,8 +14,8 @@
# limitations under the License.
SW_AGENT_JAVA_COMMIT=3997f0256056788bd054ee37e4603c11c0fd6756
-SW_OAP_COMMIT=b5578b68fd0d5d8e4543ad7d8fc9c3066ace456c
-SW_UI_COMMIT=b5578b68fd0d5d8e4543ad7d8fc9c3066ace456c
-SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
+SW_OAP_COMMIT=1acba5d2467f6d905a6261e999f85464d5432562
+SW_UI_COMMIT=1acba5d2467f6d905a6261e999f85464d5432562
+SW_KUBERNETES_COMMIT_SHA=9f42abfe9b87149dc55320237b6cc9dfea733a9b
SW_CTL_COMMIT=45a5f9807126dc6ea9fe06e17e7aafbe212436d2
diff --git a/test/e2e/case/istio/als/e2e.yaml b/test/e2e/case/istio/als/e2e.yaml
index d94c579..8779f2f 100644
--- a/test/e2e/case/istio/als/e2e.yaml
+++ b/test/e2e/case/istio/als/e2e.yaml
@@ -64,14 +64,14 @@ setup:
--set ui.image.tag=$SW_UI_COMMIT \
--set oap.image.tag=$SW_OAP_COMMIT \
--set
oap.image.repository=ghcr.io/apache/skywalking/oap \
- --set oap.storageType=elasticsearch
+ --set oap.storageType=elasticsearch \
+ --set satellite.enabled=true \
+ --set
satellite.image.repository=apache/skywalking-satellite \
+ --set satellite.image.tag=vlatest
wait:
- namespace: istio-system
resource: deployments/skywalking-oap
for: condition=available
- - name: Setup Satellite
- path: ../satellite.yaml
- wait:
- namespace: istio-system
resource: deployments/skywalking-satellite
for: condition=available
diff --git a/test/e2e/case/istio/metrics/e2e.yaml
b/test/e2e/case/istio/metrics/e2e.yaml
index fff2796..f754f7b 100644
--- a/test/e2e/case/istio/metrics/e2e.yaml
+++ b/test/e2e/case/istio/metrics/e2e.yaml
@@ -74,14 +74,14 @@ setup:
--set ui.image.tag=$SW_UI_COMMIT \
--set oap.image.tag=$SW_OAP_COMMIT \
--set
oap.image.repository=ghcr.io/apache/skywalking/oap \
- --set oap.storageType=elasticsearch
+ --set oap.storageType=elasticsearch \
+ --set satellite.enabled=true \
+ --set
satellite.image.repository=apache/skywalking-satellite \
+ --set satellite.image.tag=vlatest
wait:
- namespace: istio-system
resource: deployments/skywalking-oap
for: condition=available
- - name: Setup Satellite
- path: ../satellite.yaml
- wait:
- namespace: istio-system
resource: deployments/skywalking-satellite
for: condition=available
diff --git a/test/e2e/case/istio/satellite.yaml
b/test/e2e/case/istio/satellite.yaml
deleted file mode 100644
index e2c66d1..0000000
--- a/test/e2e/case/istio/satellite.yaml
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-apiVersion: apps/v1
-kind: Deployment
-metadata:
- name: skywalking-satellite
- namespace: istio-system
- labels:
- app: skywalking-satellite
-spec:
- replicas: 1
- selector:
- matchLabels:
- app: skywalking-satellite
- template:
- metadata:
- annotations:
- sidecar.istio.io/inject: "false"
- labels:
- app: skywalking-satellite
- spec:
- containers:
- - name: skywalking-satellite
- image: apache/skywalking-satellite:vlatest
- env:
- - name: SATELLITE_GRPC_CLIENT
- value: "skywalking-oap.istio-system:11800"
----
-apiVersion: v1
-kind: Service
-metadata:
- name: skywalking-satellite
- namespace: istio-system
- labels:
- app: skywalking-satellite
-spec:
- type: ClusterIP
- ports:
- - port: 11800
- name: grpc
- - port: 1234
- name: prometheus
- selector:
- app: skywalking-satellite