This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new abc4f0c  Support the new als protocol (#88)
abc4f0c is described below

commit abc4f0c0f94f22a0bc0214b5225f254710c4fd0e
Author: mrproliu <[email protected]>
AuthorDate: Tue Nov 30 12:23:46 2021 +0800

    Support the new als protocol (#88)
---
 go.mod                                         |  2 +-
 go.sum                                         |  4 +-
 plugins/forwarder/grpc/envoyalsv2/forwarder.go | 43 +++++++++----------
 plugins/forwarder/grpc/envoyalsv3/forwarder.go | 43 +++++++++----------
 test/e2e/base/env                              |  6 +--
 test/e2e/case/istio/als/e2e.yaml               |  8 ++--
 test/e2e/case/istio/metrics/e2e.yaml           |  8 ++--
 test/e2e/case/istio/satellite.yaml             | 57 --------------------------
 8 files changed, 58 insertions(+), 113 deletions(-)

diff --git a/go.mod b/go.mod
index df3d948..92baf98 100644
--- a/go.mod
+++ b/go.mod
@@ -23,5 +23,5 @@ require (
        google.golang.org/protobuf v1.27.1
        gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
        gotest.tools v2.2.0+incompatible
-       skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21
+       skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68
 )
diff --git a/go.sum b/go.sum
index 8bff2d3..28f5755 100644
--- a/go.sum
+++ b/go.sum
@@ -1330,6 +1330,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod 
h1:bJZC9H9iH24zzfZ/41RGcq60oK
 sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21 
h1:USC28w3toXoRiNzSCN3lLgnmT8l6RokW7++GiXcNMCU=
-skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21/go.mod 
h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
+skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68 
h1:DE5enrtUAh/PViRIJCYFLMBjmvPLJVmXEyHJZjBtK90=
+skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68/go.mod 
h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
 sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod 
h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/plugins/forwarder/grpc/envoyalsv2/forwarder.go 
b/plugins/forwarder/grpc/envoyalsv2/forwarder.go
index 082ebe1..91020a4 100644
--- a/plugins/forwarder/grpc/envoyalsv2/forwarder.go
+++ b/plugins/forwarder/grpc/envoyalsv2/forwarder.go
@@ -23,8 +23,8 @@ import (
        "io"
        "reflect"
 
-       v2 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v2"
        v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+       v3 "skywalking.apache.org/repo/goapi/satellite/envoy/accesslog/v3"
 
        "google.golang.org/grpc"
 
@@ -42,7 +42,7 @@ const (
 
 type Forwarder struct {
        config.CommonFields
-       alsClient v2.AccessLogServiceClient
+       alsClient v3.SatelliteAccessLogServiceClient
 
        eventReadySendCount        *telemetry.Counter
        eventSendFinishedCount     *telemetry.Counter
@@ -87,7 +87,7 @@ func (f *Forwarder) Prepare(connection interface{}) error {
                return fmt.Errorf("the %s only accepts a grpc client, but 
received a %s",
                        f.Name(), reflect.TypeOf(connection).String())
        }
-       f.alsClient = v2.NewAccessLogServiceClient(client)
+       f.alsClient = v3.NewSatelliteAccessLogServiceClient(client)
        f.init()
        return nil
 }
@@ -99,18 +99,18 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
                
f.streamingReadySendCount.Add(float64(len(data.EnvoyALSV2List.Messages)))
        }
 
-       for _, e := range batch {
-               // open stream
-               timeRecord := f.forwardConnectTime.Start()
-               stream, err := 
f.alsClient.StreamAccessLogs(context.Background())
-               timeRecord.Stop()
-               if err != nil {
-                       log.Logger.Errorf("open grpc stream error %v", err)
-                       return err
-               }
-               peer := 
server_grpc.GetPeerHostFromStreamContext(stream.Context())
-               timeRecord = f.forwardSendTime.Start()
+       // open stream
+       timeRecord := f.forwardConnectTime.Start()
+       stream, err := f.alsClient.StreamAccessLogs(context.Background())
+       timeRecord.Stop()
+       if err != nil {
+               log.Logger.Errorf("open grpc stream error %v", err)
+               return err
+       }
+       peer := server_grpc.GetPeerHostFromStreamContext(stream.Context())
+       timeRecord = f.forwardSendTime.Start()
 
+       for _, e := range batch {
                data := e.GetEnvoyALSV2List()
                if data == nil {
                        continue
@@ -127,17 +127,18 @@ func (f *Forwarder) Forward(batch event.BatchEvents) 
error {
                }
 
                f.eventSendFinishedCount.Inc(peer)
-               timeRecord.Stop()
-
-               // close stream
-               timeRecord = f.forwardCloseTime.Start()
-               f.closeStream(stream)
-               timeRecord.Stop()
        }
+
+       timeRecord.Stop()
+
+       // close stream
+       timeRecord = f.forwardCloseTime.Start()
+       f.closeStream(stream)
+       timeRecord.Stop()
        return nil
 }
 
-func (f *Forwarder) closeStream(stream 
v2.AccessLogService_StreamAccessLogsClient) {
+func (f *Forwarder) closeStream(stream 
v3.SatelliteAccessLogService_StreamAccessLogsClient) {
        _, err := stream.CloseAndRecv()
        if err != nil && err != io.EOF {
                log.Logger.Warnf("%s close stream error: %v", f.Name(), err)
diff --git a/plugins/forwarder/grpc/envoyalsv3/forwarder.go 
b/plugins/forwarder/grpc/envoyalsv3/forwarder.go
index 1ffacc6..a727136 100644
--- a/plugins/forwarder/grpc/envoyalsv3/forwarder.go
+++ b/plugins/forwarder/grpc/envoyalsv3/forwarder.go
@@ -25,8 +25,8 @@ import (
 
        "google.golang.org/grpc"
 
-       v3 "skywalking.apache.org/repo/goapi/proto/envoy/service/accesslog/v3"
        v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
+       v3 "skywalking.apache.org/repo/goapi/satellite/envoy/accesslog/v3"
 
        "github.com/apache/skywalking-satellite/internal/pkg/config"
        "github.com/apache/skywalking-satellite/internal/pkg/log"
@@ -42,7 +42,7 @@ const (
 
 type Forwarder struct {
        config.CommonFields
-       alsClient v3.AccessLogServiceClient
+       alsClient v3.SatelliteAccessLogServiceClient
 
        eventReadySendCount        *telemetry.Counter
        eventSendFinishedCount     *telemetry.Counter
@@ -87,7 +87,7 @@ func (f *Forwarder) Prepare(connection interface{}) error {
                return fmt.Errorf("the %s only accepts a grpc client, but 
received a %s",
                        f.Name(), reflect.TypeOf(connection).String())
        }
-       f.alsClient = v3.NewAccessLogServiceClient(client)
+       f.alsClient = v3.NewSatelliteAccessLogServiceClient(client)
        f.init()
        return nil
 }
@@ -99,18 +99,18 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
                
f.streamingReadySendCount.Add(float64(len(data.EnvoyALSV3List.Messages)))
        }
 
-       for _, e := range batch {
-               // open stream
-               timeRecord := f.forwardConnectTime.Start()
-               stream, err := 
f.alsClient.StreamAccessLogs(context.Background())
-               timeRecord.Stop()
-               if err != nil {
-                       log.Logger.Errorf("open grpc stream error %v", err)
-                       return err
-               }
-               peer := 
server_grpc.GetPeerHostFromStreamContext(stream.Context())
-               timeRecord = f.forwardSendTime.Start()
+       // open stream
+       timeRecord := f.forwardConnectTime.Start()
+       stream, err := f.alsClient.StreamAccessLogs(context.Background())
+       timeRecord.Stop()
+       if err != nil {
+               log.Logger.Errorf("open grpc stream error %v", err)
+               return err
+       }
+       peer := server_grpc.GetPeerHostFromStreamContext(stream.Context())
+       timeRecord = f.forwardSendTime.Start()
 
+       for _, e := range batch {
                data := e.GetEnvoyALSV3List()
                if data == nil {
                        continue
@@ -127,17 +127,18 @@ func (f *Forwarder) Forward(batch event.BatchEvents) 
error {
                }
 
                f.eventSendFinishedCount.Inc(peer)
-               timeRecord.Stop()
-
-               // close stream
-               timeRecord = f.forwardCloseTime.Start()
-               f.closeStream(stream)
-               timeRecord.Stop()
        }
+
+       timeRecord.Stop()
+
+       // close stream
+       timeRecord = f.forwardCloseTime.Start()
+       f.closeStream(stream)
+       timeRecord.Stop()
        return nil
 }
 
-func (f *Forwarder) closeStream(stream 
v3.AccessLogService_StreamAccessLogsClient) {
+func (f *Forwarder) closeStream(stream 
v3.SatelliteAccessLogService_StreamAccessLogsClient) {
        _, err := stream.CloseAndRecv()
        if err != nil && err != io.EOF {
                log.Logger.Warnf("%s close stream error: %v", f.Name(), err)
diff --git a/test/e2e/base/env b/test/e2e/base/env
index 77e2439..bb7d3a9 100644
--- a/test/e2e/base/env
+++ b/test/e2e/base/env
@@ -14,8 +14,8 @@
 # limitations under the License.
 
 SW_AGENT_JAVA_COMMIT=3997f0256056788bd054ee37e4603c11c0fd6756
-SW_OAP_COMMIT=b5578b68fd0d5d8e4543ad7d8fc9c3066ace456c
-SW_UI_COMMIT=b5578b68fd0d5d8e4543ad7d8fc9c3066ace456c
-SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
+SW_OAP_COMMIT=1acba5d2467f6d905a6261e999f85464d5432562
+SW_UI_COMMIT=1acba5d2467f6d905a6261e999f85464d5432562
+SW_KUBERNETES_COMMIT_SHA=9f42abfe9b87149dc55320237b6cc9dfea733a9b
 
 SW_CTL_COMMIT=45a5f9807126dc6ea9fe06e17e7aafbe212436d2
diff --git a/test/e2e/case/istio/als/e2e.yaml b/test/e2e/case/istio/als/e2e.yaml
index d94c579..8779f2f 100644
--- a/test/e2e/case/istio/als/e2e.yaml
+++ b/test/e2e/case/istio/als/e2e.yaml
@@ -64,14 +64,14 @@ setup:
                        --set ui.image.tag=$SW_UI_COMMIT \
                        --set oap.image.tag=$SW_OAP_COMMIT \
                        --set 
oap.image.repository=ghcr.io/apache/skywalking/oap \
-                       --set oap.storageType=elasticsearch
+                       --set oap.storageType=elasticsearch \
+                       --set satellite.enabled=true \
+                       --set 
satellite.image.repository=apache/skywalking-satellite \
+                       --set satellite.image.tag=vlatest
       wait:
         - namespace: istio-system
           resource: deployments/skywalking-oap
           for: condition=available
-    - name: Setup Satellite
-      path: ../satellite.yaml
-      wait:
         - namespace: istio-system
           resource: deployments/skywalking-satellite
           for: condition=available
diff --git a/test/e2e/case/istio/metrics/e2e.yaml 
b/test/e2e/case/istio/metrics/e2e.yaml
index fff2796..f754f7b 100644
--- a/test/e2e/case/istio/metrics/e2e.yaml
+++ b/test/e2e/case/istio/metrics/e2e.yaml
@@ -74,14 +74,14 @@ setup:
                        --set ui.image.tag=$SW_UI_COMMIT \
                        --set oap.image.tag=$SW_OAP_COMMIT \
                        --set 
oap.image.repository=ghcr.io/apache/skywalking/oap \
-                       --set oap.storageType=elasticsearch
+                       --set oap.storageType=elasticsearch \
+                       --set satellite.enabled=true \
+                       --set 
satellite.image.repository=apache/skywalking-satellite \
+                       --set satellite.image.tag=vlatest
       wait:
         - namespace: istio-system
           resource: deployments/skywalking-oap
           for: condition=available
-    - name: Setup Satellite
-      path: ../satellite.yaml
-      wait:
         - namespace: istio-system
           resource: deployments/skywalking-satellite
           for: condition=available
diff --git a/test/e2e/case/istio/satellite.yaml 
b/test/e2e/case/istio/satellite.yaml
deleted file mode 100644
index e2c66d1..0000000
--- a/test/e2e/case/istio/satellite.yaml
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-apiVersion: apps/v1
-kind: Deployment
-metadata:
-  name: skywalking-satellite
-  namespace: istio-system
-  labels:
-    app: skywalking-satellite
-spec:
-  replicas: 1
-  selector:
-    matchLabels:
-      app: skywalking-satellite
-  template:
-    metadata:
-      annotations:
-        sidecar.istio.io/inject: "false"
-      labels:
-        app: skywalking-satellite
-    spec:
-      containers:
-        - name: skywalking-satellite
-          image: apache/skywalking-satellite:vlatest
-          env:
-            - name: SATELLITE_GRPC_CLIENT
-              value: "skywalking-oap.istio-system:11800"
----
-apiVersion: v1
-kind: Service
-metadata:
-  name: skywalking-satellite
-  namespace: istio-system
-  labels:
-    app: skywalking-satellite
-spec:
-  type: ClusterIP
-  ports:
-  - port: 11800
-    name: grpc
-  - port: 1234
-    name: prometheus
-  selector:
-    app: skywalking-satellite

Reply via email to