This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new 43867a4 fix data race in mmap & fix blocking channel in sender (#37)
43867a4 is described below
commit 43867a47d859532761d1e1cbd9ea385066807c50
Author: Evan <[email protected]>
AuthorDate: Wed Apr 7 17:44:21 2021 +0800
fix data race in mmap & fix blocking channel in sender (#37)
---
CHANGES.md | 22 +---
docs/en/concepts-and-designs/mmap-queue.md | 24 ++--
go.mod | 8 +-
go.sum | 14 ++-
internal/satellite/module/sender/create.go | 3 +-
internal/satellite/module/sender/sender.go | 138 ++++++++++++---------
plugins/queue/mmap/meta/meta.go | 32 ++++-
plugins/queue/mmap/queue.go | 19 +--
plugins/queue/mmap/queue_lock.go | 42 +++++++
plugins/queue/mmap/queue_operation.go | 13 +-
plugins/queue/mmap/segment_operation.go | 7 +-
protocol/gen-codes/satellite/protocol/Event.pb.go | 7 +-
.../v3/ConfigurationDiscoveryService.pb.go | 7 +-
.../skywalking/network/common/v3/Common.pb.go | 7 +-
protocol/gen-codes/skywalking/network/go.mod | 5 +-
protocol/gen-codes/skywalking/network/go.sum | 15 ++-
.../network/language/agent/v3/BrowserPerf.pb.go | 7 +-
.../network/language/agent/v3/CLRMetric.pb.go | 7 +-
.../network/language/agent/v3/JVMMetric.pb.go | 7 +-
.../network/language/agent/v3/Meter.pb.go | 7 +-
.../network/language/agent/v3/Tracing.pb.go | 7 +-
.../network/language/profile/v3/Profile.pb.go | 7 +-
.../skywalking/network/logging/v3/Logging.pb.go | 7 +-
.../network/management/v3/Management.pb.go | 7 +-
.../network/servicemesh/v3/service-mesh.pb.go | 7 +-
tools/protocol_gen.sh | 6 +-
26 files changed, 237 insertions(+), 195 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1b4032e..96ea4f7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,25 +2,15 @@ Changes by Version
==================
Release Notes.
-0.1.0
+0.2.0
------------------
#### Features
-* Build the Satellite core structure.
-* Add prometheus self telemetry.
-* Add kafka client plugin.
-* Add none-fallbacker plugin.
-* Add timer-fallbacker plugin.
-* Add nativelog-kafka-forwarder plugin.
-* Add memory-queue plugin.
-* Add mmap-queue plugin.
-* Add grpc-nativelog-receiver plugin.
-* Add http-nativelog-receiver plugin.
-* Add grpc-server plugin.
-* Add http-server plugin.
-* Add prometheus-server plugin.
+Update protoc-gen-go version to 1.26.0.
#### Bug Fixes
+Fix the data race in mmap queue.
+Fix channel blocking in sender module.
#### Issues and PR
-- All issues are
[here](https://github.com/apache/skywalking/milestone/64?closed=1)
-- All and pull requests are
[here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Aopen+is%3Apr+milestone%3A0.1.0)
+- All issues are
[here](https://github.com/apache/skywalking/milestone/80?closed=1)
+- All and pull requests are
[here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Aopen+is%3Apr+milestone%3A0.2.0)
diff --git a/docs/en/concepts-and-designs/mmap-queue.md
b/docs/en/concepts-and-designs/mmap-queue.md
index df0eaf0..d3b3582 100644
--- a/docs/en/concepts-and-designs/mmap-queue.md
+++ b/docs/en/concepts-and-designs/mmap-queue.md
@@ -41,12 +41,12 @@ goos: darwin
goarch: amd64
pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap
BenchmarkEnqueue
-BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000
10000 106520 ns/op 9888 B/op
9 allocs/op
-BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
18536 54331 ns/op 9839 B/op
9 allocs/op
-BenchmarkEnqueue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000
27859 43251 ns/op 9815 B/op
9 allocs/op
-BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000
23673 45910 ns/op 9839 B/op
9 allocs/op
-BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000
10000 131686 ns/op 18941 B/op 10
allocs/op
-BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000
23011 47101 ns/op 9887 B/op 9
allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000
27585 43559 ns/op 9889 B/op
9 allocs/op
+BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
39326 31773 ns/op 9840 B/op
9 allocs/op
+BenchmarkEnqueue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000
56770 22990 ns/op 9816 B/op
9 allocs/op
+BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000
43803 29778 ns/op 9840 B/op
9 allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000
16870 80576 ns/op 18944 B/op 10
allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000
36922 39085 ns/op 9889 B/op 9
allocs/op
PASS
```
### push and pop operation
@@ -55,11 +55,11 @@ goos: darwin
goarch: amd64
pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap
BenchmarkEnqueueAndDequeue
-BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000
18895 53056 ns/op 28773 B/op 42
allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
24104 117128 ns/op 28725 B/op 42
allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000
23733 71632 ns/op 28699 B/op 41
allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000
26286 64377 ns/op 28725 B/op 42
allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000
10000 118004 ns/op 54978 B/op 43
allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000
16489 64400 ns/op 28772 B/op 42
allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000
21030 60728 ns/op 28774 B/op 42
allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
30327 41274 ns/op 28726 B/op 42
allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000
32738 37923 ns/op 28700 B/op 42
allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000
28209 41169 ns/op 28726 B/op 42
allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000
14677 89637 ns/op 54981 B/op 43
allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000
22228 54963 ns/op 28774 B/op 42
allocs/op
PASS
```
diff --git a/go.mod b/go.mod
index 0c7e00c..c015a07 100644
--- a/go.mod
+++ b/go.mod
@@ -7,14 +7,14 @@ replace skywalking/network v1.0.0 =>
./protocol/gen-codes/skywalking/network
require (
github.com/Shopify/sarama v1.27.2
github.com/enriquebris/goconcurrentqueue v0.6.0
- github.com/golang/protobuf v1.4.3
- github.com/google/go-cmp v0.5.4
+ github.com/golang/protobuf v1.5.2 // indirect
+ github.com/google/go-cmp v0.5.5
github.com/grandecola/mmap v0.6.0
github.com/prometheus/client_golang v1.9.0
github.com/sirupsen/logrus v1.7.0
github.com/spf13/viper v1.7.1
github.com/urfave/cli/v2 v2.3.0
- google.golang.org/grpc v1.35.0
- google.golang.org/protobuf v1.25.0
+ google.golang.org/grpc v1.36.1
+ google.golang.org/protobuf v1.26.0
skywalking/network v1.0.0
)
diff --git a/go.sum b/go.sum
index 13be86f..5ec45ee 100644
--- a/go.sum
+++ b/go.sum
@@ -134,6 +134,9 @@ github.com/golang/protobuf v1.4.1/go.mod
h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
github.com/golang/protobuf v1.4.2/go.mod
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3
h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
github.com/golang/protobuf v1.4.3/go.mod
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2
h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
@@ -147,8 +150,8 @@ github.com/google/go-cmp v0.3.1/go.mod
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
-github.com/google/go-cmp v0.5.4/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible
h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod
h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
@@ -589,8 +592,8 @@ google.golang.org/grpc v1.25.1/go.mod
h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ
google.golang.org/grpc v1.26.0/go.mod
h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg=
google.golang.org/grpc v1.27.0/go.mod
h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
-google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8=
-google.golang.org/grpc v1.35.0/go.mod
h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.36.1 h1:cmUfbeGKnz9+2DD/UYsMQXeqbHZqZDs4eQwW0sFOpBY=
+google.golang.org/grpc v1.36.1/go.mod
h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod
h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod
h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod
h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -601,6 +604,9 @@ google.golang.org/protobuf v1.23.0/go.mod
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0
h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod
h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod
h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0
h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod
h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127
h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
diff --git a/internal/satellite/module/sender/create.go
b/internal/satellite/module/sender/create.go
index 5da3cca..12906e2 100644
--- a/internal/satellite/module/sender/create.go
+++ b/internal/satellite/module/sender/create.go
@@ -36,8 +36,7 @@ func NewSender(cfg *api.SenderConfig, g gatherer.Gatherer)
api.Sender {
runningFallbacker:
fallbacker.GetFallbacker(cfg.FallbackerConfig),
runningClient:
sharing.Manager[cfg.ClientName].(client.Client),
gatherer: g,
- logicInput: nil,
- physicalInput: make(chan *event.OutputEventContext),
+ input: make(chan *event.OutputEventContext),
listener: make(chan client.ClientStatus),
flushChannel: make(chan *buffer.BatchBuffer, 1),
buffer: buffer.NewBatchBuffer(cfg.MaxBufferSize),
diff --git a/internal/satellite/module/sender/sender.go
b/internal/satellite/module/sender/sender.go
index 86eec6d..e75e24c 100644
--- a/internal/satellite/module/sender/sender.go
+++ b/internal/satellite/module/sender/sender.go
@@ -20,6 +20,7 @@ package sender
import (
"context"
"sync"
+ "sync/atomic"
"time"
"github.com/sirupsen/logrus"
@@ -51,11 +52,11 @@ type Sender struct {
gatherer gatherer.Gatherer
// self components
- logicInput chan *event.OutputEventContext // logic input channel
- physicalInput chan *event.OutputEventContext // physical input channel
- listener chan client.ClientStatus // client status listener
- flushChannel chan *buffer.BatchBuffer // forwarder flush channel
- buffer *buffer.BatchBuffer // cache the downstream
input data
+ input chan *event.OutputEventContext // physical input channel
+ listener chan client.ClientStatus // client status listener
+ flushChannel chan *buffer.BatchBuffer // forwarder flush channel
+ buffer *buffer.BatchBuffer // cache the downstream
input data
+ blocking int32 // the status of input
channel
// metrics
sendCounter *telemetry.Counter
@@ -65,7 +66,6 @@ type Sender struct {
func (s *Sender) Prepare() error {
log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is
preparing...")
s.runningClient.RegisterListener(s.listener)
- s.logicInput = s.physicalInput
for _, runningForwarder := range s.runningForwarders {
err :=
runningForwarder.Prepare(s.runningClient.GetConnectedClient())
if err != nil {
@@ -80,64 +80,90 @@ func (s *Sender) Prepare() error {
func (s *Sender) Boot(ctx context.Context) {
log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is
starting...")
var wg sync.WaitGroup
- wg.Add(2)
- // 1. keep fetching the downstream data when client connected, and put
it into BatchBuffer.
- // 2. When reaches the buffer limit or receives a timer flush signal,
and put BatchBuffer into flushChannel.
- go func() {
- defer wg.Done()
- childCtx, cancel := context.WithCancel(ctx)
- timeTicker := time.NewTicker(time.Duration(s.config.FlushTime)
* time.Millisecond)
- for {
- select {
- case status := <-s.listener:
- switch status {
- case client.Connected:
- log.Logger.WithField("pipe",
s.config.PipeName).Info("the client connection of the sender module is
connected")
- s.logicInput = s.physicalInput
- case client.Disconnect:
- log.Logger.WithField("pipe",
s.config.PipeName).Info("the client connection of the sender module is
disconnected")
- s.logicInput = nil
- }
- case <-timeTicker.C:
- if s.buffer.Len() > s.config.MinFlushEvents {
- s.flushChannel <- s.buffer
- s.buffer =
buffer.NewBatchBuffer(s.config.MaxBufferSize)
- }
- case e := <-s.logicInput:
- s.buffer.Add(e)
- if s.buffer.Len() == s.config.MaxBufferSize {
- s.flushChannel <- s.buffer
- s.buffer =
buffer.NewBatchBuffer(s.config.MaxBufferSize)
- }
- case <-childCtx.Done():
- cancel()
- s.logicInput = nil
- return
+ wg.Add(3)
+ go s.store(ctx, &wg)
+ go s.listen(ctx, &wg)
+ go s.flush(ctx, &wg)
+ wg.Wait()
+}
+
+// store data.
+// 1. keep fetching the downstream data when client connected, and put it into
BatchBuffer.
+// 2. When reaches the buffer limit or receives a timer flush signal, and put
BatchBuffer into flushChannel.
+func (s *Sender) store(ctx context.Context, wg *sync.WaitGroup) {
+ defer wg.Done()
+ defer log.Logger.WithField("pipe", s.config.PipeName).Infof("store
routine closed")
+ childCtx, _ := context.WithCancel(ctx) // nolint
+ timeTicker := time.NewTicker(time.Duration(s.config.FlushTime) *
time.Millisecond)
+ for {
+ // blocking output when disconnecting.
+ if atomic.LoadInt32(&s.blocking) == 1 {
+ time.Sleep(100 * time.Millisecond)
+ continue
+ }
+ select {
+ case <-childCtx.Done():
+ return
+ case <-timeTicker.C:
+ if s.buffer.Len() > s.config.MinFlushEvents {
+ s.flushChannel <- s.buffer
+ s.buffer =
buffer.NewBatchBuffer(s.config.MaxBufferSize)
+ }
+ case e := <-s.input:
+ if e == nil {
+ continue
+ }
+ s.buffer.Add(e)
+ if s.buffer.Len() == s.config.MaxBufferSize {
+ s.flushChannel <- s.buffer
+ s.buffer =
buffer.NewBatchBuffer(s.config.MaxBufferSize)
}
}
- }()
- // Keep fetching BatchBuffer to forward.
- go func() {
- defer wg.Done()
- childCtx, cancel := context.WithCancel(ctx)
- for {
- select {
- case b := <-s.flushChannel:
- s.consume(b)
- case <-childCtx.Done():
- cancel()
- s.Shutdown()
- return
+ }
+}
+
+// Listen the client status.
+func (s *Sender) listen(ctx context.Context, wg *sync.WaitGroup) {
+ defer wg.Done()
+ defer log.Logger.WithField("pipe", s.config.PipeName).Infof("listen
routine closed")
+ childCtx, _ := context.WithCancel(ctx) // nolint
+ for {
+ select {
+ case <-childCtx.Done():
+ return
+ case status := <-s.listener:
+ switch status {
+ case client.Connected:
+ log.Logger.WithField("pipe",
s.config.PipeName).Info("the client connection of the sender module connected")
+ atomic.StoreInt32(&s.blocking, 0)
+ case client.Disconnect:
+ log.Logger.WithField("pipe",
s.config.PipeName).Info("the client connection of the sender module
disconnected")
+ atomic.StoreInt32(&s.blocking, 1)
}
}
- }()
- wg.Wait()
+ }
+}
+
+// Keep fetching BatchBuffer to forward.
+func (s *Sender) flush(ctx context.Context, wg *sync.WaitGroup) {
+ defer wg.Done()
+ defer log.Logger.WithField("pipe", s.config.PipeName).Infof("flush
routine closed")
+ childCtx, _ := context.WithCancel(ctx) // nolint
+ for {
+ select {
+ case <-childCtx.Done():
+ s.Shutdown()
+ return
+ case b := <-s.flushChannel:
+ s.consume(b)
+ }
+ }
}
// Shutdown closes the channels and tries to force forward the events in the
buffer.
func (s *Sender) Shutdown() {
log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is
closing")
- close(s.physicalInput)
+ close(s.input)
ticker := time.NewTicker(module.ShutdownHookTime)
for {
select {
@@ -187,5 +213,5 @@ func (s *Sender) consume(batch *buffer.BatchBuffer) {
}
func (s *Sender) InputDataChannel() chan<- *event.OutputEventContext {
- return s.logicInput
+ return s.input
}
diff --git a/plugins/queue/mmap/meta/meta.go b/plugins/queue/mmap/meta/meta.go
index 11f75fa..36a5faa 100644
--- a/plugins/queue/mmap/meta/meta.go
+++ b/plugins/queue/mmap/meta/meta.go
@@ -22,6 +22,7 @@ package meta
import (
"fmt"
"path/filepath"
+ "sync"
"syscall"
"github.com/grandecola/mmap"
@@ -60,6 +61,7 @@ type Metadata struct {
name string
size int
capacity int
+ lock sync.RWMutex
}
// NewMetaData read or create a Metadata with supported metaVersion
@@ -92,76 +94,104 @@ func NewMetaData(metaDir string, capacity int) (*Metadata,
error) {
// GetVersion returns the meta version.
func (m *Metadata) GetVersion() int {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
return int(m.metaFile.ReadUint64At(versionPos))
}
// PutVersion put the version into the memory mapped file.
func (m *Metadata) PutVersion(version int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
m.metaFile.WriteUint64At(uint64(version), versionPos)
}
// GetWritingOffset returns the writing offset, which contains the segment ID
and the offset of the segment.
func (m *Metadata) GetWritingOffset() (segmentID, offset int64) {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
return int64(m.metaFile.ReadUint64At(widPos)),
int64(m.metaFile.ReadUint64At(woffsetPos))
}
// PutWritingOffset put the segment ID and the offset of the segment into the
writing offset.
func (m *Metadata) PutWritingOffset(segmentID, offset int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
m.metaFile.WriteUint64At(uint64(segmentID), widPos)
m.metaFile.WriteUint64At(uint64(offset), woffsetPos)
}
// GetWatermarkOffset returns the watermark offset, which contains the segment
ID and the offset of the segment.
func (m *Metadata) GetWatermarkOffset() (segmentID, offset int64) {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
return int64(m.metaFile.ReadUint64At(wmidPos)),
int64(m.metaFile.ReadUint64At(wmoffsetPos))
}
// PutWatermarkOffset put the segment ID and the offset of the segment into
the watermark offset.
func (m *Metadata) PutWatermarkOffset(segmentID, offset int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
m.metaFile.WriteUint64At(uint64(segmentID), wmidPos)
m.metaFile.WriteUint64At(uint64(offset), wmoffsetPos)
}
// GetCommittedOffset returns the committed offset, which contains the segment
ID and the offset of the segment.
func (m *Metadata) GetCommittedOffset() (segmentID, offset int64) {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
return int64(m.metaFile.ReadUint64At(cidPos)),
int64(m.metaFile.ReadUint64At(coffsetPos))
}
// PutCommittedOffset put the segment ID and the offset of the segment into
the committed offset.
func (m *Metadata) PutCommittedOffset(segmentID, offset int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
m.metaFile.WriteUint64At(uint64(segmentID), cidPos)
m.metaFile.WriteUint64At(uint64(offset), coffsetPos)
}
// GetReadingOffset returns the reading offset, which contains the segment ID
and the offset of the segment.
func (m *Metadata) GetReadingOffset() (segmentID, offset int64) {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
return int64(m.metaFile.ReadUint64At(ridPos)),
int64(m.metaFile.ReadUint64At(roffsetPos))
}
// PutReadingOffset put the segment ID and the offset of the segment into the
reading offset.
func (m *Metadata) PutReadingOffset(segmentID, offset int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
m.metaFile.WriteUint64At(uint64(segmentID), ridPos)
m.metaFile.WriteUint64At(uint64(offset), roffsetPos)
}
// GetCapacity returns the capacity of the queue.
func (m *Metadata) GetCapacity() int {
+ m.lock.RLock()
+ defer m.lock.RUnlock()
return int(m.metaFile.ReadUint64At(capacityPos))
}
// PutCapacity put the capacity into the memory mapped file.
func (m *Metadata) PutCapacity(version int64) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
m.metaFile.WriteUint64At(uint64(version), capacityPos)
}
// Flush the memory mapped file to the disk.
func (m *Metadata) Flush() error {
+ m.lock.Lock()
+ defer m.lock.Unlock()
return m.metaFile.Flush(syscall.MS_SYNC)
}
// Close do Flush operation and unmap the memory mapped file.
func (m *Metadata) Close() error {
- if err := m.Flush(); err != nil {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ if err := m.metaFile.Flush(syscall.MS_SYNC); err != nil {
return err
}
return m.metaFile.Unmap()
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
index 44d20ee..539c59f 100644
--- a/plugins/queue/mmap/queue.go
+++ b/plugins/queue/mmap/queue.go
@@ -61,7 +61,6 @@ type Queue struct {
MaxEventSize int `mapstructure:"max_event_size"` //
The max size of the input event.
// running components
- lock sync.Mutex
queueName string // The queue name.
meta *meta.Metadata // The metadata file.
segments []*mmap.File // The data files.
@@ -72,6 +71,7 @@ type Queue struct {
sufficientMemChannel chan struct{} // Notify when memory is
sufficient
markReadChannel chan int64 // Transfer the read segmentID to
do ummap operation.
ready bool // The status of the queue.
+ locker []int32 // locker
// control components
ctx context.Context // Parent ctx
@@ -145,6 +145,7 @@ func (q *Queue) Initialize() error {
q.ctx, q.cancel = context.WithCancel(context.Background())
// async supported processes.
q.showDownWg.Add(2)
+ q.locker = make([]int32, q.QueueCapacitySegments)
go q.segmentSwapper()
go q.flush()
q.ready = true
@@ -187,8 +188,7 @@ func (q *Queue) Dequeue() (*api.SequenceEvent, error) {
}
func (q *Queue) Close() error {
- q.lock.Lock()
- defer q.lock.Unlock()
+ q.ready = false
q.cancel()
q.showDownWg.Wait()
for i, segment := range q.segments {
@@ -204,7 +204,6 @@ func (q *Queue) Close() error {
if err := q.meta.Close(); err != nil {
log.Logger.Errorf("cannot unmap the metadata: %v", err)
}
- q.ready = false
return nil
}
@@ -226,8 +225,7 @@ func (q *Queue) Ack(lastOffset event.Offset) {
// flush control the flush operation by timer or counter.
func (q *Queue) flush() {
defer q.showDownWg.Done()
- ctx, cancel := context.WithCancel(q.ctx)
- defer cancel()
+ ctx, _ := context.WithCancel(q.ctx) // nolint
for {
timer := time.NewTimer(time.Duration(q.FlushPeriod) *
time.Millisecond)
select {
@@ -245,13 +243,16 @@ func (q *Queue) flush() {
// doFlush flush the segment and meta files to the disk.
func (q *Queue) doFlush() {
- for _, segment := range q.segments {
- if segment == nil {
+ for i := range q.segments {
+ q.lockByIndex(i)
+ if q.segments[i] == nil {
+ q.unlockByIndex(i)
continue
}
- if err := segment.Flush(syscall.MS_SYNC); err != nil {
+ if err := q.segments[i].Flush(syscall.MS_SYNC); err != nil {
log.Logger.Errorf("cannot flush segment file: %v", err)
}
+ q.unlockByIndex(i)
}
wid, woffset := q.meta.GetWritingOffset()
q.meta.PutWatermarkOffset(wid, woffset)
diff --git a/plugins/queue/mmap/queue_lock.go b/plugins/queue/mmap/queue_lock.go
new file mode 100644
index 0000000..a58580e
--- /dev/null
+++ b/plugins/queue/mmap/queue_lock.go
@@ -0,0 +1,42 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// +build !windows
+
+package mmap
+
+import "sync/atomic"
+
+func (q *Queue) lock(segmentID int64) {
+ index := q.GetIndex(segmentID)
+ q.lockByIndex(index)
+}
+
+func (q *Queue) unlock(segmentID int64) {
+ index := q.GetIndex(segmentID)
+ q.unlockByIndex(index)
+}
+
+func (q *Queue) lockByIndex(index int) {
+ for !atomic.CompareAndSwapInt32(&q.locker[index], 0, 1) {
+ }
+}
+
+func (q *Queue) unlockByIndex(index int) {
+ for !atomic.CompareAndSwapInt32(&q.locker[index], 1, 0) {
+ }
+}
diff --git a/plugins/queue/mmap/queue_operation.go
b/plugins/queue/mmap/queue_operation.go
index 9ee6e74..d00df14 100644
--- a/plugins/queue/mmap/queue_operation.go
+++ b/plugins/queue/mmap/queue_operation.go
@@ -43,8 +43,6 @@ const uInt64Size = 8
// enqueue writes the data into the file system. It first writes the length of
the data,
// then the data itself. It means the whole data may not exist in the one
segments.
func (q *Queue) enqueue(bytes []byte) error {
- q.lock.Lock()
- defer q.lock.Unlock()
if q.isFull() {
return api.ErrFull
}
@@ -69,8 +67,6 @@ func (q *Queue) enqueue(bytes []byte) error {
// dequeue reads the data from the file system. It first reads the length of
the data,
// then the data itself. It means the whole data may not exist in the one
segments.
func (q *Queue) dequeue() (data []byte, rid, roffset int64, err error) {
- q.lock.Lock()
- defer q.lock.Unlock()
if q.isEmpty() {
return nil, 0, 0, api.ErrEmpty
}
@@ -95,11 +91,13 @@ func (q *Queue) readBytes(id, offset int64, length int)
(data []byte, newID, new
counter := 0
res := make([]byte, length)
for {
+ q.lock(id)
segment, err := q.GetSegment(id)
if err != nil {
return nil, 0, 0, err
}
readBytes, err := segment.ReadAt(res[counter:], offset)
+ q.unlock(id)
if err != nil {
return nil, 0, 0, err
}
@@ -120,11 +118,13 @@ func (q *Queue) readLength(id, offset int64) (newID,
newOffset int64, length int
if offset+uInt64Size > int64(q.SegmentSize) {
id, offset = id+1, 0
}
+ q.lock(id)
segment, err := q.GetSegment(id)
if err != nil {
return 0, 0, 0, err
}
num := segment.ReadUint64At(offset)
+ q.unlock(id)
offset += uInt64Size
if offset == int64(q.SegmentSize) {
id, offset = id+1, 0
@@ -137,11 +137,13 @@ func (q *Queue) writeLength(length int, id, offset int64)
(newID, newOffset int6
if offset+uInt64Size > int64(q.SegmentSize) {
id, offset = id+1, 0
}
+ q.lock(id)
segment, err := q.GetSegment(id)
if err != nil {
return 0, 0, err
}
segment.WriteUint64At(uint64(length), offset)
+ q.unlock(id)
offset += uInt64Size
if offset == int64(q.SegmentSize) {
id, offset = id+1, 0
@@ -153,13 +155,14 @@ func (q *Queue) writeLength(length int, id, offset int64)
(newID, newOffset int6
func (q *Queue) writeBytes(bytes []byte, id, offset int64) (newID, newOffset
int64, err error) {
counter := 0
length := len(bytes)
-
for {
+ q.lock(id)
segment, err := q.GetSegment(id)
if err != nil {
return 0, 0, err
}
writtenBytes, err := segment.WriteAt(bytes[counter:], offset)
+ q.unlock(id)
if err != nil {
return 0, 0, err
}
diff --git a/plugins/queue/mmap/segment_operation.go
b/plugins/queue/mmap/segment_operation.go
index 42d54b2..aa2f421 100644
--- a/plugins/queue/mmap/segment_operation.go
+++ b/plugins/queue/mmap/segment_operation.go
@@ -35,7 +35,7 @@ import (
// GetSegment returns a memory mapped file at the segmentID position.
func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
- if q.mmapCount >= q.MaxInMemSegments {
+ if atomic.LoadInt32(&q.mmapCount) >= q.MaxInMemSegments {
q.insufficientMemChannel <- struct{}{}
<-q.sufficientMemChannel
}
@@ -85,14 +85,15 @@ func (q *Queue) unmapSegment(segmentID int64) error {
// segmentSwapper run with a go routine to ensure the memory cost.
func (q *Queue) segmentSwapper() {
defer q.showDownWg.Done()
- ctx, cancel := context.WithCancel(q.ctx)
- defer cancel()
+ ctx, _ := context.WithCancel(q.ctx) // nolint
for {
select {
case id := <-q.markReadChannel:
+ q.lock(id)
if q.unmapSegment(id) != nil {
log.Logger.Errorf("cannot unmap the markread
segment: %d", id)
}
+ q.unlock(id)
case <-q.insufficientMemChannel:
if q.mmapCount >= q.MaxInMemSegments {
if q.doSwap() != nil {
diff --git a/protocol/gen-codes/satellite/protocol/Event.pb.go
b/protocol/gen-codes/satellite/protocol/Event.pb.go
index 89d4821..225bb2e 100644
--- a/protocol/gen-codes/satellite/protocol/Event.pb.go
+++ b/protocol/gen-codes/satellite/protocol/Event.pb.go
@@ -17,14 +17,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.25.0
+// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: event/Event.proto
package protocol
import (
- proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -43,10 +42,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// This is a compile-time assertion that a sufficiently up-to-date version
-// of the legacy proto package is being used.
-const _ = proto.ProtoPackageIsVersion4
-
// EventType declares the supported transfer data type.
type EventType int32
diff --git
a/protocol/gen-codes/skywalking/network/agent/configuration/v3/ConfigurationDiscoveryService.pb.go
b/protocol/gen-codes/skywalking/network/agent/configuration/v3/ConfigurationDiscoveryService.pb.go
index 023a8b1..089c26d 100644
---
a/protocol/gen-codes/skywalking/network/agent/configuration/v3/ConfigurationDiscoveryService.pb.go
+++
b/protocol/gen-codes/skywalking/network/agent/configuration/v3/ConfigurationDiscoveryService.pb.go
@@ -17,14 +17,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.25.0
+// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: language-agent/ConfigurationDiscoveryService.proto
package v3
import (
- proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -39,10 +38,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// This is a compile-time assertion that a sufficiently up-to-date version
-// of the legacy proto package is being used.
-const _ = proto.ProtoPackageIsVersion4
-
type ConfigurationSyncRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
diff --git a/protocol/gen-codes/skywalking/network/common/v3/Common.pb.go
b/protocol/gen-codes/skywalking/network/common/v3/Common.pb.go
index a3c545d..2f79ebc 100644
--- a/protocol/gen-codes/skywalking/network/common/v3/Common.pb.go
+++ b/protocol/gen-codes/skywalking/network/common/v3/Common.pb.go
@@ -17,14 +17,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.25.0
+// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: common/Common.proto
package v3
import (
- proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -38,10 +37,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// This is a compile-time assertion that a sufficiently up-to-date version
-// of the legacy proto package is being used.
-const _ = proto.ProtoPackageIsVersion4
-
// In most cases, detect point should be `server` or `client`.
// Even in service mesh, this means `server`/`client` side sidecar
// `proxy` is reserved only.
diff --git a/protocol/gen-codes/skywalking/network/go.mod
b/protocol/gen-codes/skywalking/network/go.mod
index 0797ac0..ce3c323 100644
--- a/protocol/gen-codes/skywalking/network/go.mod
+++ b/protocol/gen-codes/skywalking/network/go.mod
@@ -3,7 +3,6 @@ module skywalking/network
go 1.15
require (
- github.com/golang/protobuf v1.4.3
- google.golang.org/grpc v1.35.0
- google.golang.org/protobuf v1.25.0
+ google.golang.org/grpc v1.36.1
+ google.golang.org/protobuf v1.26.0
)
diff --git a/protocol/gen-codes/skywalking/network/go.sum
b/protocol/gen-codes/skywalking/network/go.sum
index 8f0da6e..6ee6f61 100644
--- a/protocol/gen-codes/skywalking/network/go.sum
+++ b/protocol/gen-codes/skywalking/network/go.sum
@@ -19,14 +19,15 @@ github.com/golang/protobuf
v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
github.com/golang/protobuf v1.4.0/go.mod
h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1/go.mod
h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2/go.mod
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
-github.com/golang/protobuf v1.4.3
h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
-github.com/golang/protobuf v1.4.3/go.mod
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0
h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4=
+github.com/golang/protobuf v1.5.0/go.mod
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/go-cmp v0.2.0/go.mod
h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.2/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -67,8 +68,8 @@ google.golang.org/grpc v1.19.0/go.mod
h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi
google.golang.org/grpc v1.23.0/go.mod
h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod
h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod
h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
-google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8=
-google.golang.org/grpc v1.35.0/go.mod
h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.36.1 h1:cmUfbeGKnz9+2DD/UYsMQXeqbHZqZDs4eQwW0sFOpBY=
+google.golang.org/grpc v1.36.1/go.mod
h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod
h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod
h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod
h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -77,8 +78,10 @@ google.golang.org/protobuf v1.21.0/go.mod
h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
google.golang.org/protobuf v1.22.0/go.mod
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
-google.golang.org/protobuf v1.25.0
h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod
h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod
h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0
h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
diff --git
a/protocol/gen-codes/skywalking/network/language/agent/v3/BrowserPerf.pb.go
b/protocol/gen-codes/skywalking/network/language/agent/v3/BrowserPerf.pb.go
index b460ab9..333a7c4 100644
--- a/protocol/gen-codes/skywalking/network/language/agent/v3/BrowserPerf.pb.go
+++ b/protocol/gen-codes/skywalking/network/language/agent/v3/BrowserPerf.pb.go
@@ -17,14 +17,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.25.0
+// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: browser/BrowserPerf.proto
package v3
import (
- proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -39,10 +38,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// This is a compile-time assertion that a sufficiently up-to-date version
-// of the legacy proto package is being used.
-const _ = proto.ProtoPackageIsVersion4
-
type ErrorCategory int32
const (
diff --git
a/protocol/gen-codes/skywalking/network/language/agent/v3/CLRMetric.pb.go
b/protocol/gen-codes/skywalking/network/language/agent/v3/CLRMetric.pb.go
index 53ce96d..22139cf 100644
--- a/protocol/gen-codes/skywalking/network/language/agent/v3/CLRMetric.pb.go
+++ b/protocol/gen-codes/skywalking/network/language/agent/v3/CLRMetric.pb.go
@@ -17,14 +17,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.25.0
+// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: language-agent/CLRMetric.proto
package v3
import (
- proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -39,10 +38,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// This is a compile-time assertion that a sufficiently up-to-date version
-// of the legacy proto package is being used.
-const _ = proto.ProtoPackageIsVersion4
-
type CLRMetricCollection struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
diff --git
a/protocol/gen-codes/skywalking/network/language/agent/v3/JVMMetric.pb.go
b/protocol/gen-codes/skywalking/network/language/agent/v3/JVMMetric.pb.go
index e322cf4..f3e5619 100644
--- a/protocol/gen-codes/skywalking/network/language/agent/v3/JVMMetric.pb.go
+++ b/protocol/gen-codes/skywalking/network/language/agent/v3/JVMMetric.pb.go
@@ -17,14 +17,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.25.0
+// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: language-agent/JVMMetric.proto
package v3
import (
- proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -39,10 +38,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// This is a compile-time assertion that a sufficiently up-to-date version
-// of the legacy proto package is being used.
-const _ = proto.ProtoPackageIsVersion4
-
type PoolType int32
const (
diff --git
a/protocol/gen-codes/skywalking/network/language/agent/v3/Meter.pb.go
b/protocol/gen-codes/skywalking/network/language/agent/v3/Meter.pb.go
index fa944d4..a916cc1 100644
--- a/protocol/gen-codes/skywalking/network/language/agent/v3/Meter.pb.go
+++ b/protocol/gen-codes/skywalking/network/language/agent/v3/Meter.pb.go
@@ -17,14 +17,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.25.0
+// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: language-agent/Meter.proto
package v3
import (
- proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -39,10 +38,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// This is a compile-time assertion that a sufficiently up-to-date version
-// of the legacy proto package is being used.
-const _ = proto.ProtoPackageIsVersion4
-
// Label of the meter
type Label struct {
state protoimpl.MessageState
diff --git
a/protocol/gen-codes/skywalking/network/language/agent/v3/Tracing.pb.go
b/protocol/gen-codes/skywalking/network/language/agent/v3/Tracing.pb.go
index e502b84..be0c6f3 100644
--- a/protocol/gen-codes/skywalking/network/language/agent/v3/Tracing.pb.go
+++ b/protocol/gen-codes/skywalking/network/language/agent/v3/Tracing.pb.go
@@ -17,14 +17,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.25.0
+// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: language-agent/Tracing.proto
package v3
import (
- proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -39,10 +38,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// This is a compile-time assertion that a sufficiently up-to-date version
-// of the legacy proto package is being used.
-const _ = proto.ProtoPackageIsVersion4
-
// Map to the type of span
type SpanType int32
diff --git
a/protocol/gen-codes/skywalking/network/language/profile/v3/Profile.pb.go
b/protocol/gen-codes/skywalking/network/language/profile/v3/Profile.pb.go
index 9f84f14..9798e91 100644
--- a/protocol/gen-codes/skywalking/network/language/profile/v3/Profile.pb.go
+++ b/protocol/gen-codes/skywalking/network/language/profile/v3/Profile.pb.go
@@ -17,14 +17,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.25.0
+// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: profile/Profile.proto
package v3
import (
- proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -39,10 +38,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// This is a compile-time assertion that a sufficiently up-to-date version
-// of the legacy proto package is being used.
-const _ = proto.ProtoPackageIsVersion4
-
type ProfileTaskCommandQuery struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
diff --git a/protocol/gen-codes/skywalking/network/logging/v3/Logging.pb.go
b/protocol/gen-codes/skywalking/network/logging/v3/Logging.pb.go
index 557d33a..ffcda67 100644
--- a/protocol/gen-codes/skywalking/network/logging/v3/Logging.pb.go
+++ b/protocol/gen-codes/skywalking/network/logging/v3/Logging.pb.go
@@ -17,14 +17,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.25.0
+// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: logging/Logging.proto
package v3
import (
- proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -39,10 +38,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// This is a compile-time assertion that a sufficiently up-to-date version
-// of the legacy proto package is being used.
-const _ = proto.ProtoPackageIsVersion4
-
// Log data is collected through file scratcher of agent.
// Natively, Satellite provides various ways to collect logs.
type LogData struct {
diff --git
a/protocol/gen-codes/skywalking/network/management/v3/Management.pb.go
b/protocol/gen-codes/skywalking/network/management/v3/Management.pb.go
index daf8426..7feb466 100644
--- a/protocol/gen-codes/skywalking/network/management/v3/Management.pb.go
+++ b/protocol/gen-codes/skywalking/network/management/v3/Management.pb.go
@@ -17,14 +17,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.25.0
+// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: management/Management.proto
package v3
import (
- proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -39,10 +38,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// This is a compile-time assertion that a sufficiently up-to-date version
-// of the legacy proto package is being used.
-const _ = proto.ProtoPackageIsVersion4
-
type InstanceProperties struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
diff --git
a/protocol/gen-codes/skywalking/network/servicemesh/v3/service-mesh.pb.go
b/protocol/gen-codes/skywalking/network/servicemesh/v3/service-mesh.pb.go
index 37065d3..77d6d65 100644
--- a/protocol/gen-codes/skywalking/network/servicemesh/v3/service-mesh.pb.go
+++ b/protocol/gen-codes/skywalking/network/servicemesh/v3/service-mesh.pb.go
@@ -17,14 +17,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.25.0
+// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: service-mesh-probe/service-mesh.proto
package v3
import (
- proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
@@ -39,10 +38,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// This is a compile-time assertion that a sufficiently up-to-date version
-// of the legacy proto package is being used.
-const _ = proto.ProtoPackageIsVersion4
-
type Protocol int32
const (
diff --git a/tools/protocol_gen.sh b/tools/protocol_gen.sh
index 367a8ad..d0c15fa 100755
--- a/tools/protocol_gen.sh
+++ b/tools/protocol_gen.sh
@@ -28,8 +28,10 @@ export GEN_CODE_PATH=protocol/gen-codes
export COLLECT_PROTOCOL_MODULE=skywalking/network
-go get -u google.golang.org/protobuf/cmd/[email protected]
-go get -u google.golang.org/grpc/cmd/[email protected]
+export GO111MODULE=on
+go get google.golang.org/protobuf/cmd/[email protected]
+go get google.golang.org/grpc/cmd/[email protected]
+export PATH="$PATH:$(go env GOPATH)/bin"
# generate codes by merged proto files
rm -rf $GEN_CODE_PATH && rm -rf $PROTO_HOME