This is an automated email from the ASF dual-hosted git repository.

liujiapeng pushed a commit to branch fix-race-bug
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git

commit f353fac01c867f9198f289c56132fb4d8ae8129a
Author: Evan <[email protected]>
AuthorDate: Wed Apr 7 15:30:48 2021 +0800

    fix some bugs in Satellite
---
 CHANGES.md                                         |  22 +---
 README.md                                          |   4 +-
 docs/en/concepts-and-designs/mmap-queue.md         |  24 ++--
 docs/en/guides/contribution/How-to-release.md      |   4 +-
 docs/en/guides/contribution/How-to-write-plugin.md |   2 +-
 go.mod                                             |   8 +-
 go.sum                                             |  14 ++-
 internal/satellite/module/sender/create.go         |   3 +-
 internal/satellite/module/sender/sender.go         | 138 ++++++++++++---------
 plugins/queue/mmap/meta/meta.go                    |  32 ++++-
 plugins/queue/mmap/queue.go                        |  19 +--
 plugins/queue/mmap/queue_lock.go                   |  54 ++++++++
 plugins/queue/mmap/queue_operation.go              |  13 +-
 plugins/queue/mmap/segment_operation.go            |   7 +-
 protocol/gen-codes/skywalking/network/go.mod       |   6 +-
 protocol/gen-codes/skywalking/network/go.sum       |  16 ++-
 16 files changed, 240 insertions(+), 126 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1b4032e..be5f0f5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,25 +2,15 @@ Changes by Version
 ==================
 Release Notes.
 
-0.1.0
+0.2.0
 ------------------
 #### Features
-* Build the Satellite core structure.
-* Add prometheus self telemetry.
-* Add kafka client plugin.
-* Add none-fallbacker plugin.
-* Add timer-fallbacker plugin.
-* Add nativelog-kafka-forwarder plugin.
-* Add memory-queue plugin.
-* Add mmap-queue plugin.
-* Add grpc-nativelog-receiver plugin.
-* Add http-nativelog-receiver plugin.
-* Add grpc-server plugin.
-* Add http-server plugin.
-* Add prometheus-server plugin.
+
 
 #### Bug Fixes
+Fix the data race in mmap queue.
+Fix channel blocking in sender module. 
 
 #### Issues and PR
-- All issues  are 
[here](https://github.com/apache/skywalking/milestone/64?closed=1)  
-- All and pull requests are 
[here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Aopen+is%3Apr+milestone%3A0.1.0)
+- All issues  are 
[here](https://github.com/apache/skywalking/milestone/80?closed=1)  
+- All and pull requests are 
[here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Aopen+is%3Apr+milestone%3A0.2.0)
diff --git a/README.md b/README.md
index b1d97b7..6915593 100644
--- a/README.md
+++ b/README.md
@@ -21,7 +21,7 @@ NOTICE, SkyWalking Satellite uses [v3 
protocols](https://github.com/apache/skywa
 Please read [the doc](./docs/en/FAQ/performance.md) to get more details.
 
 # Download
-Go to the [download page](https://skywalking.apache.org/downloads/) to 
download all available binaries, including MacOS, Linux and Windows. Due to 
system compatibility problems, some plugins of SkyWalking Satellite cannot be 
used in Windows system. Check [the corresponding 
documentation](./docs/en/guides/compile/How-to-compile.md) to see whether the 
plugin is available on Windows.
+Go to the [download page](https://skywalking.apache.org/downloads/) to 
download all available binaries, including MacOS, Linux and Windows. Due to 
system compatibility problems, some plugins of SkyWalking Satellite cannot be 
used in Windows system. Check [the corresponding 
documentation](./docs/en/setup/plugins) to see whether the plugin is available 
on Windows.
 
 # Compile
 As SkyWalking Satellite is using `Makefile`, compiling the project is as easy 
as executing a command in the root directory of the project.
@@ -32,7 +32,7 @@ git submodule init
 git submodule update
 make build
 ```
-If you want to know more details about compiling, please read [the 
doc](./docs/en/guides/compile/How-to-compile.md).
+If you want to know more details about compiling, please read [the 
doc](./docs/en/guides/compile/compile.md).
 
 
 # Commands
diff --git a/docs/en/concepts-and-designs/mmap-queue.md 
b/docs/en/concepts-and-designs/mmap-queue.md
index df0eaf0..d3b3582 100644
--- a/docs/en/concepts-and-designs/mmap-queue.md
+++ b/docs/en/concepts-and-designs/mmap-queue.md
@@ -41,12 +41,12 @@ goos: darwin
 goarch: amd64
 pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap
 BenchmarkEnqueue
-BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000
                   10000            106520 ns/op            9888 B/op          
9 allocs/op
-BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
                   18536             54331 ns/op            9839 B/op          
9 allocs/op
-BenchmarkEnqueue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000
                    27859             43251 ns/op            9815 B/op          
9 allocs/op
-BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000
                   23673             45910 ns/op            9839 B/op          
9 allocs/op
-BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000
                  10000            131686 ns/op           18941 B/op         10 
allocs/op
-BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000
                  23011             47101 ns/op            9887 B/op          9 
allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000
                   27585             43559 ns/op            9889 B/op          
9 allocs/op
+BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
                   39326             31773 ns/op            9840 B/op          
9 allocs/op
+BenchmarkEnqueue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000
                    56770             22990 ns/op            9816 B/op          
9 allocs/op
+BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000
                   43803             29778 ns/op            9840 B/op          
9 allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000
                  16870             80576 ns/op           18944 B/op         10 
allocs/op
+BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000
                  36922             39085 ns/op            9889 B/op          9 
allocs/op
 PASS
 ```
 ### push and pop operation
@@ -55,11 +55,11 @@ goos: darwin
 goarch: amd64
 pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap
 BenchmarkEnqueueAndDequeue
-BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000
                 18895             53056 ns/op           28773 B/op         42 
allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
                 24104            117128 ns/op           28725 B/op         42 
allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000
                  23733             71632 ns/op           28699 B/op         41 
allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000
                 26286             64377 ns/op           28725 B/op         42 
allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000
                10000            118004 ns/op           54978 B/op         43 
allocs/op
-BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000
                16489             64400 ns/op           28772 B/op         42 
allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000
                 21030             60728 ns/op           28774 B/op         42 
allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000
                 30327             41274 ns/op           28726 B/op         42 
allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000
                  32738             37923 ns/op           28700 B/op         42 
allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000
                 28209             41169 ns/op           28726 B/op         42 
allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000
                14677             89637 ns/op           54981 B/op         43 
allocs/op
+BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000
                22228             54963 ns/op           28774 B/op         42 
allocs/op
 PASS
 ```
diff --git a/docs/en/guides/contribution/How-to-release.md 
b/docs/en/guides/contribution/How-to-release.md
index af07674..ad5dbd8 100644
--- a/docs/en/guides/contribution/How-to-release.md
+++ b/docs/en/guides/contribution/How-to-release.md
@@ -5,7 +5,7 @@ This documentation guides the release manager to release the 
SkyWalking Satellit
 ## Prerequisites
 
 1. Close(if finished, or move to next milestone otherwise) all issues in the 
current milestone from 
[skywalking-satellite](https://github.com/apache/skywalking-satellite/milestones)
 and [skywalking](https://github.com/apache/skywalking/milestones), create a 
new milestone if needed.
-2. Update [CHANGES.md](../../../../CHANGES.md).
+2. Update [CHANGES.md](../CHANGES.md).
 
 
 ## Add your GPG public key to Apache svn
@@ -152,7 +152,7 @@ are in 
`https://dist.apache.org/repos/dist/dev/skywalking/satellite/$VERSION` wi
 1. `LICENSE` and `NOTICE` are in source codes and distribution package.
 1. Check `shasum -c skywalking-satellite-$VERSION-{src,bin}.tgz.sha512`.
 1. Check `gpg --verify skywalking-satellite-$VERSION-{src,bin}.tgz.asc 
skywalking-satellite-$VERSION-{src,bin}.tgz`.
-1. Build distribution from source code package by following this command, 
`make build`.
+1. Build distribution from source code package by following this [the build 
guide](#build-and-sign-the-source-code-package).
 1. Licenses check, `make license`.
 
 Vote result should follow these:
diff --git a/docs/en/guides/contribution/How-to-write-plugin.md 
b/docs/en/guides/contribution/How-to-write-plugin.md
index 555f046..b2f4a61 100644
--- a/docs/en/guides/contribution/How-to-write-plugin.md
+++ b/docs/en/guides/contribution/How-to-write-plugin.md
@@ -32,7 +32,7 @@ Let's use memory-queue as an example of how to write a plugin.
     event_buffer_size: 5000
     ```
    
-3. Add [unit test](../test/How-to-unit-test.md).
+3. Add [unit test](../test/test.md).
 4. Generate the plugin docs.
 ```shell script
 make check
diff --git a/go.mod b/go.mod
index 0c7e00c..d1d86c4 100644
--- a/go.mod
+++ b/go.mod
@@ -7,14 +7,14 @@ replace skywalking/network v1.0.0 => 
./protocol/gen-codes/skywalking/network
 require (
        github.com/Shopify/sarama v1.27.2
        github.com/enriquebris/goconcurrentqueue v0.6.0
-       github.com/golang/protobuf v1.4.3
-       github.com/google/go-cmp v0.5.4
+       github.com/golang/protobuf v1.5.2
+       github.com/google/go-cmp v0.5.5
        github.com/grandecola/mmap v0.6.0
        github.com/prometheus/client_golang v1.9.0
        github.com/sirupsen/logrus v1.7.0
        github.com/spf13/viper v1.7.1
        github.com/urfave/cli/v2 v2.3.0
-       google.golang.org/grpc v1.35.0
-       google.golang.org/protobuf v1.25.0
+       google.golang.org/grpc v1.36.1
+       google.golang.org/protobuf v1.26.0
        skywalking/network v1.0.0
 )
diff --git a/go.sum b/go.sum
index 13be86f..5ec45ee 100644
--- a/go.sum
+++ b/go.sum
@@ -134,6 +134,9 @@ github.com/golang/protobuf v1.4.1/go.mod 
h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
 github.com/golang/protobuf v1.4.2/go.mod 
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
 github.com/golang/protobuf v1.4.3 
h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
 github.com/golang/protobuf v1.4.3/go.mod 
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod 
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 
h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod 
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db 
h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod 
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
@@ -147,8 +150,8 @@ github.com/google/go-cmp v0.3.1/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
 github.com/google/go-cmp v0.4.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.2/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
-github.com/google/go-cmp v0.5.4/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/gofuzz v1.0.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/martian v2.1.0+incompatible 
h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
 github.com/google/martian v2.1.0+incompatible/go.mod 
h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
@@ -589,8 +592,8 @@ google.golang.org/grpc v1.25.1/go.mod 
h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ
 google.golang.org/grpc v1.26.0/go.mod 
h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
 google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg=
 google.golang.org/grpc v1.27.0/go.mod 
h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
-google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8=
-google.golang.org/grpc v1.35.0/go.mod 
h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.36.1 h1:cmUfbeGKnz9+2DD/UYsMQXeqbHZqZDs4eQwW0sFOpBY=
+google.golang.org/grpc v1.36.1/go.mod 
h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod 
h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod 
h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
 google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod 
h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -601,6 +604,9 @@ google.golang.org/protobuf v1.23.0/go.mod 
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod 
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 google.golang.org/protobuf v1.25.0 
h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
 google.golang.org/protobuf v1.25.0/go.mod 
h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod 
h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0 
h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod 
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod 
h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 
h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
diff --git a/internal/satellite/module/sender/create.go 
b/internal/satellite/module/sender/create.go
index 5da3cca..12906e2 100644
--- a/internal/satellite/module/sender/create.go
+++ b/internal/satellite/module/sender/create.go
@@ -36,8 +36,7 @@ func NewSender(cfg *api.SenderConfig, g gatherer.Gatherer) 
api.Sender {
                runningFallbacker: 
fallbacker.GetFallbacker(cfg.FallbackerConfig),
                runningClient:     
sharing.Manager[cfg.ClientName].(client.Client),
                gatherer:          g,
-               logicInput:        nil,
-               physicalInput:     make(chan *event.OutputEventContext),
+               input:             make(chan *event.OutputEventContext),
                listener:          make(chan client.ClientStatus),
                flushChannel:      make(chan *buffer.BatchBuffer, 1),
                buffer:            buffer.NewBatchBuffer(cfg.MaxBufferSize),
diff --git a/internal/satellite/module/sender/sender.go 
b/internal/satellite/module/sender/sender.go
index 86eec6d..e75e24c 100644
--- a/internal/satellite/module/sender/sender.go
+++ b/internal/satellite/module/sender/sender.go
@@ -20,6 +20,7 @@ package sender
 import (
        "context"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/sirupsen/logrus"
@@ -51,11 +52,11 @@ type Sender struct {
        gatherer gatherer.Gatherer
 
        // self components
-       logicInput    chan *event.OutputEventContext // logic input channel
-       physicalInput chan *event.OutputEventContext // physical input channel
-       listener      chan client.ClientStatus       // client status listener
-       flushChannel  chan *buffer.BatchBuffer       // forwarder flush channel
-       buffer        *buffer.BatchBuffer            // cache the downstream 
input data
+       input        chan *event.OutputEventContext // physical input channel
+       listener     chan client.ClientStatus       // client status listener
+       flushChannel chan *buffer.BatchBuffer       // forwarder flush channel
+       buffer       *buffer.BatchBuffer            // cache the downstream 
input data
+       blocking     int32                          // the status of input 
channel
 
        // metrics
        sendCounter *telemetry.Counter
@@ -65,7 +66,6 @@ type Sender struct {
 func (s *Sender) Prepare() error {
        log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is 
preparing...")
        s.runningClient.RegisterListener(s.listener)
-       s.logicInput = s.physicalInput
        for _, runningForwarder := range s.runningForwarders {
                err := 
runningForwarder.Prepare(s.runningClient.GetConnectedClient())
                if err != nil {
@@ -80,64 +80,90 @@ func (s *Sender) Prepare() error {
 func (s *Sender) Boot(ctx context.Context) {
        log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is 
starting...")
        var wg sync.WaitGroup
-       wg.Add(2)
-       // 1. keep fetching the downstream data when client connected, and put 
it into BatchBuffer.
-       // 2. When reaches the buffer limit or receives a timer flush signal, 
and put BatchBuffer into flushChannel.
-       go func() {
-               defer wg.Done()
-               childCtx, cancel := context.WithCancel(ctx)
-               timeTicker := time.NewTicker(time.Duration(s.config.FlushTime) 
* time.Millisecond)
-               for {
-                       select {
-                       case status := <-s.listener:
-                               switch status {
-                               case client.Connected:
-                                       log.Logger.WithField("pipe", 
s.config.PipeName).Info("the client connection of the sender module is 
connected")
-                                       s.logicInput = s.physicalInput
-                               case client.Disconnect:
-                                       log.Logger.WithField("pipe", 
s.config.PipeName).Info("the client connection of the sender module is 
disconnected")
-                                       s.logicInput = nil
-                               }
-                       case <-timeTicker.C:
-                               if s.buffer.Len() > s.config.MinFlushEvents {
-                                       s.flushChannel <- s.buffer
-                                       s.buffer = 
buffer.NewBatchBuffer(s.config.MaxBufferSize)
-                               }
-                       case e := <-s.logicInput:
-                               s.buffer.Add(e)
-                               if s.buffer.Len() == s.config.MaxBufferSize {
-                                       s.flushChannel <- s.buffer
-                                       s.buffer = 
buffer.NewBatchBuffer(s.config.MaxBufferSize)
-                               }
-                       case <-childCtx.Done():
-                               cancel()
-                               s.logicInput = nil
-                               return
+       wg.Add(3)
+       go s.store(ctx, &wg)
+       go s.listen(ctx, &wg)
+       go s.flush(ctx, &wg)
+       wg.Wait()
+}
+
+// store data.
+// 1. keep fetching the downstream data when client connected, and put it into 
BatchBuffer.
+// 2. When reaches the buffer limit or receives a timer flush signal, and put 
BatchBuffer into flushChannel.
+func (s *Sender) store(ctx context.Context, wg *sync.WaitGroup) {
+       defer wg.Done()
+       defer log.Logger.WithField("pipe", s.config.PipeName).Infof("store 
routine closed")
+       childCtx, _ := context.WithCancel(ctx) // nolint
+       timeTicker := time.NewTicker(time.Duration(s.config.FlushTime) * 
time.Millisecond)
+       for {
+               // blocking output when disconnecting.
+               if atomic.LoadInt32(&s.blocking) == 1 {
+                       time.Sleep(100 * time.Millisecond)
+                       continue
+               }
+               select {
+               case <-childCtx.Done():
+                       return
+               case <-timeTicker.C:
+                       if s.buffer.Len() > s.config.MinFlushEvents {
+                               s.flushChannel <- s.buffer
+                               s.buffer = 
buffer.NewBatchBuffer(s.config.MaxBufferSize)
+                       }
+               case e := <-s.input:
+                       if e == nil {
+                               continue
+                       }
+                       s.buffer.Add(e)
+                       if s.buffer.Len() == s.config.MaxBufferSize {
+                               s.flushChannel <- s.buffer
+                               s.buffer = 
buffer.NewBatchBuffer(s.config.MaxBufferSize)
                        }
                }
-       }()
-       // Keep fetching BatchBuffer to forward.
-       go func() {
-               defer wg.Done()
-               childCtx, cancel := context.WithCancel(ctx)
-               for {
-                       select {
-                       case b := <-s.flushChannel:
-                               s.consume(b)
-                       case <-childCtx.Done():
-                               cancel()
-                               s.Shutdown()
-                               return
+       }
+}
+
+// Listen the client status.
+func (s *Sender) listen(ctx context.Context, wg *sync.WaitGroup) {
+       defer wg.Done()
+       defer log.Logger.WithField("pipe", s.config.PipeName).Infof("listen 
routine closed")
+       childCtx, _ := context.WithCancel(ctx) // nolint
+       for {
+               select {
+               case <-childCtx.Done():
+                       return
+               case status := <-s.listener:
+                       switch status {
+                       case client.Connected:
+                               log.Logger.WithField("pipe", 
s.config.PipeName).Info("the client connection of the sender module connected")
+                               atomic.StoreInt32(&s.blocking, 0)
+                       case client.Disconnect:
+                               log.Logger.WithField("pipe", 
s.config.PipeName).Info("the client connection of the sender module 
disconnected")
+                               atomic.StoreInt32(&s.blocking, 1)
                        }
                }
-       }()
-       wg.Wait()
+       }
+}
+
+// Keep fetching BatchBuffer to forward.
+func (s *Sender) flush(ctx context.Context, wg *sync.WaitGroup) {
+       defer wg.Done()
+       defer log.Logger.WithField("pipe", s.config.PipeName).Infof("flush 
routine closed")
+       childCtx, _ := context.WithCancel(ctx) // nolint
+       for {
+               select {
+               case <-childCtx.Done():
+                       s.Shutdown()
+                       return
+               case b := <-s.flushChannel:
+                       s.consume(b)
+               }
+       }
 }
 
 // Shutdown closes the channels and tries to force forward the events in the 
buffer.
 func (s *Sender) Shutdown() {
        log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is 
closing")
-       close(s.physicalInput)
+       close(s.input)
        ticker := time.NewTicker(module.ShutdownHookTime)
        for {
                select {
@@ -187,5 +213,5 @@ func (s *Sender) consume(batch *buffer.BatchBuffer) {
 }
 
 func (s *Sender) InputDataChannel() chan<- *event.OutputEventContext {
-       return s.logicInput
+       return s.input
 }
diff --git a/plugins/queue/mmap/meta/meta.go b/plugins/queue/mmap/meta/meta.go
index 11f75fa..36a5faa 100644
--- a/plugins/queue/mmap/meta/meta.go
+++ b/plugins/queue/mmap/meta/meta.go
@@ -22,6 +22,7 @@ package meta
 import (
        "fmt"
        "path/filepath"
+       "sync"
        "syscall"
 
        "github.com/grandecola/mmap"
@@ -60,6 +61,7 @@ type Metadata struct {
        name     string
        size     int
        capacity int
+       lock     sync.RWMutex
 }
 
 // NewMetaData read or create a Metadata with supported metaVersion
@@ -92,76 +94,104 @@ func NewMetaData(metaDir string, capacity int) (*Metadata, 
error) {
 
 // GetVersion returns the meta version.
 func (m *Metadata) GetVersion() int {
+       m.lock.RLock()
+       defer m.lock.RUnlock()
        return int(m.metaFile.ReadUint64At(versionPos))
 }
 
 // PutVersion put the version into the memory mapped file.
 func (m *Metadata) PutVersion(version int64) {
+       m.lock.Lock()
+       defer m.lock.Unlock()
        m.metaFile.WriteUint64At(uint64(version), versionPos)
 }
 
 // GetWritingOffset returns the writing offset, which contains the segment ID 
and the offset of the segment.
 func (m *Metadata) GetWritingOffset() (segmentID, offset int64) {
+       m.lock.RLock()
+       defer m.lock.RUnlock()
        return int64(m.metaFile.ReadUint64At(widPos)), 
int64(m.metaFile.ReadUint64At(woffsetPos))
 }
 
 // PutWritingOffset put the segment ID and the offset of the segment into the 
writing offset.
 func (m *Metadata) PutWritingOffset(segmentID, offset int64) {
+       m.lock.Lock()
+       defer m.lock.Unlock()
        m.metaFile.WriteUint64At(uint64(segmentID), widPos)
        m.metaFile.WriteUint64At(uint64(offset), woffsetPos)
 }
 
 // GetWatermarkOffset returns the watermark offset, which contains the segment 
ID and the offset of the segment.
 func (m *Metadata) GetWatermarkOffset() (segmentID, offset int64) {
+       m.lock.RLock()
+       defer m.lock.RUnlock()
        return int64(m.metaFile.ReadUint64At(wmidPos)), 
int64(m.metaFile.ReadUint64At(wmoffsetPos))
 }
 
 // PutWatermarkOffset put the segment ID and the offset of the segment into 
the watermark offset.
 func (m *Metadata) PutWatermarkOffset(segmentID, offset int64) {
+       m.lock.Lock()
+       defer m.lock.Unlock()
        m.metaFile.WriteUint64At(uint64(segmentID), wmidPos)
        m.metaFile.WriteUint64At(uint64(offset), wmoffsetPos)
 }
 
 // GetCommittedOffset returns the committed offset, which contains the segment 
ID and the offset of the segment.
 func (m *Metadata) GetCommittedOffset() (segmentID, offset int64) {
+       m.lock.RLock()
+       defer m.lock.RUnlock()
        return int64(m.metaFile.ReadUint64At(cidPos)), 
int64(m.metaFile.ReadUint64At(coffsetPos))
 }
 
 // PutCommittedOffset put the segment ID and the offset of the segment into 
the committed offset.
 func (m *Metadata) PutCommittedOffset(segmentID, offset int64) {
+       m.lock.Lock()
+       defer m.lock.Unlock()
        m.metaFile.WriteUint64At(uint64(segmentID), cidPos)
        m.metaFile.WriteUint64At(uint64(offset), coffsetPos)
 }
 
 // GetReadingOffset returns the reading offset, which contains the segment ID 
and the offset of the segment.
 func (m *Metadata) GetReadingOffset() (segmentID, offset int64) {
+       m.lock.RLock()
+       defer m.lock.RUnlock()
        return int64(m.metaFile.ReadUint64At(ridPos)), 
int64(m.metaFile.ReadUint64At(roffsetPos))
 }
 
 // PutReadingOffset put the segment ID and the offset of the segment into the 
reading offset.
 func (m *Metadata) PutReadingOffset(segmentID, offset int64) {
+       m.lock.Lock()
+       defer m.lock.Unlock()
        m.metaFile.WriteUint64At(uint64(segmentID), ridPos)
        m.metaFile.WriteUint64At(uint64(offset), roffsetPos)
 }
 
 // GetCapacity returns the capacity of the queue.
 func (m *Metadata) GetCapacity() int {
+       m.lock.RLock()
+       defer m.lock.RUnlock()
        return int(m.metaFile.ReadUint64At(capacityPos))
 }
 
 // PutCapacity put the capacity into the memory mapped file.
 func (m *Metadata) PutCapacity(version int64) {
+       m.lock.Lock()
+       defer m.lock.Unlock()
        m.metaFile.WriteUint64At(uint64(version), capacityPos)
 }
 
 // Flush the memory mapped file to the disk.
 func (m *Metadata) Flush() error {
+       m.lock.Lock()
+       defer m.lock.Unlock()
        return m.metaFile.Flush(syscall.MS_SYNC)
 }
 
 // Close do Flush operation and unmap the memory mapped file.
 func (m *Metadata) Close() error {
-       if err := m.Flush(); err != nil {
+       m.lock.Lock()
+       defer m.lock.Unlock()
+       if err := m.metaFile.Flush(syscall.MS_SYNC); err != nil {
                return err
        }
        return m.metaFile.Unmap()
diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go
index 44d20ee..539c59f 100644
--- a/plugins/queue/mmap/queue.go
+++ b/plugins/queue/mmap/queue.go
@@ -61,7 +61,6 @@ type Queue struct {
        MaxEventSize          int   `mapstructure:"max_event_size"`          // 
The max size of the input event.
 
        // running components
-       lock                   sync.Mutex
        queueName              string         // The queue name.
        meta                   *meta.Metadata // The metadata file.
        segments               []*mmap.File   // The data files.
@@ -72,6 +71,7 @@ type Queue struct {
        sufficientMemChannel   chan struct{}  // Notify when memory is 
sufficient
        markReadChannel        chan int64     // Transfer the read segmentID to 
do ummap operation.
        ready                  bool           // The status of the queue.
+       locker                 []int32        // locker
 
        // control components
        ctx        context.Context    // Parent ctx
@@ -145,6 +145,7 @@ func (q *Queue) Initialize() error {
        q.ctx, q.cancel = context.WithCancel(context.Background())
        // async supported processes.
        q.showDownWg.Add(2)
+       q.locker = make([]int32, q.QueueCapacitySegments)
        go q.segmentSwapper()
        go q.flush()
        q.ready = true
@@ -187,8 +188,7 @@ func (q *Queue) Dequeue() (*api.SequenceEvent, error) {
 }
 
 func (q *Queue) Close() error {
-       q.lock.Lock()
-       defer q.lock.Unlock()
+       q.ready = false
        q.cancel()
        q.showDownWg.Wait()
        for i, segment := range q.segments {
@@ -204,7 +204,6 @@ func (q *Queue) Close() error {
        if err := q.meta.Close(); err != nil {
                log.Logger.Errorf("cannot unmap the metadata: %v", err)
        }
-       q.ready = false
        return nil
 }
 
@@ -226,8 +225,7 @@ func (q *Queue) Ack(lastOffset event.Offset) {
 // flush control the flush operation by timer or counter.
 func (q *Queue) flush() {
        defer q.showDownWg.Done()
-       ctx, cancel := context.WithCancel(q.ctx)
-       defer cancel()
+       ctx, _ := context.WithCancel(q.ctx) // nolint
        for {
                timer := time.NewTimer(time.Duration(q.FlushPeriod) * 
time.Millisecond)
                select {
@@ -245,13 +243,16 @@ func (q *Queue) flush() {
 
 // doFlush flush the segment and meta files to the disk.
 func (q *Queue) doFlush() {
-       for _, segment := range q.segments {
-               if segment == nil {
+       for i := range q.segments {
+               q.lockByIndex(i)
+               if q.segments[i] == nil {
+                       q.unlockByIndex(i)
                        continue
                }
-               if err := segment.Flush(syscall.MS_SYNC); err != nil {
+               if err := q.segments[i].Flush(syscall.MS_SYNC); err != nil {
                        log.Logger.Errorf("cannot flush segment file: %v", err)
                }
+               q.unlockByIndex(i)
        }
        wid, woffset := q.meta.GetWritingOffset()
        q.meta.PutWatermarkOffset(wid, woffset)
diff --git a/plugins/queue/mmap/queue_lock.go b/plugins/queue/mmap/queue_lock.go
new file mode 100644
index 0000000..7e7c06b
--- /dev/null
+++ b/plugins/queue/mmap/queue_lock.go
@@ -0,0 +1,54 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package mmap
+
+import "sync/atomic"
+
+func (q *Queue) lock(segmentID int64) {
+       index := q.GetIndex(segmentID)
+       for {
+               if atomic.CompareAndSwapInt32(&q.locker[index], 0, 1) {
+                       return
+               }
+       }
+}
+
+func (q *Queue) unlock(segmentID int64) {
+       index := q.GetIndex(segmentID)
+       for {
+               if atomic.CompareAndSwapInt32(&q.locker[index], 1, 0) {
+                       return
+               }
+       }
+}
+
+func (q *Queue) lockByIndex(index int) {
+       for {
+               if atomic.CompareAndSwapInt32(&q.locker[index], 0, 1) {
+                       return
+               }
+       }
+}
+
+func (q *Queue) unlockByIndex(index int) {
+       for {
+               if atomic.CompareAndSwapInt32(&q.locker[index], 1, 0) {
+                       return
+               }
+       }
+}
diff --git a/plugins/queue/mmap/queue_operation.go 
b/plugins/queue/mmap/queue_operation.go
index 9ee6e74..d00df14 100644
--- a/plugins/queue/mmap/queue_operation.go
+++ b/plugins/queue/mmap/queue_operation.go
@@ -43,8 +43,6 @@ const uInt64Size = 8
 // enqueue writes the data into the file system. It first writes the length of 
the data,
 // then the data itself. It means the whole data may not exist in the one 
segments.
 func (q *Queue) enqueue(bytes []byte) error {
-       q.lock.Lock()
-       defer q.lock.Unlock()
        if q.isFull() {
                return api.ErrFull
        }
@@ -69,8 +67,6 @@ func (q *Queue) enqueue(bytes []byte) error {
 // dequeue reads the data from the file system. It first reads the length of 
the data,
 // then the data itself. It means the whole data may not exist in the one 
segments.
 func (q *Queue) dequeue() (data []byte, rid, roffset int64, err error) {
-       q.lock.Lock()
-       defer q.lock.Unlock()
        if q.isEmpty() {
                return nil, 0, 0, api.ErrEmpty
        }
@@ -95,11 +91,13 @@ func (q *Queue) readBytes(id, offset int64, length int) 
(data []byte, newID, new
        counter := 0
        res := make([]byte, length)
        for {
+               q.lock(id)
                segment, err := q.GetSegment(id)
                if err != nil {
                        return nil, 0, 0, err
                }
                readBytes, err := segment.ReadAt(res[counter:], offset)
+               q.unlock(id)
                if err != nil {
                        return nil, 0, 0, err
                }
@@ -120,11 +118,13 @@ func (q *Queue) readLength(id, offset int64) (newID, 
newOffset int64, length int
        if offset+uInt64Size > int64(q.SegmentSize) {
                id, offset = id+1, 0
        }
+       q.lock(id)
        segment, err := q.GetSegment(id)
        if err != nil {
                return 0, 0, 0, err
        }
        num := segment.ReadUint64At(offset)
+       q.unlock(id)
        offset += uInt64Size
        if offset == int64(q.SegmentSize) {
                id, offset = id+1, 0
@@ -137,11 +137,13 @@ func (q *Queue) writeLength(length int, id, offset int64) 
(newID, newOffset int6
        if offset+uInt64Size > int64(q.SegmentSize) {
                id, offset = id+1, 0
        }
+       q.lock(id)
        segment, err := q.GetSegment(id)
        if err != nil {
                return 0, 0, err
        }
        segment.WriteUint64At(uint64(length), offset)
+       q.unlock(id)
        offset += uInt64Size
        if offset == int64(q.SegmentSize) {
                id, offset = id+1, 0
@@ -153,13 +155,14 @@ func (q *Queue) writeLength(length int, id, offset int64) 
(newID, newOffset int6
 func (q *Queue) writeBytes(bytes []byte, id, offset int64) (newID, newOffset 
int64, err error) {
        counter := 0
        length := len(bytes)
-
        for {
+               q.lock(id)
                segment, err := q.GetSegment(id)
                if err != nil {
                        return 0, 0, err
                }
                writtenBytes, err := segment.WriteAt(bytes[counter:], offset)
+               q.unlock(id)
                if err != nil {
                        return 0, 0, err
                }
diff --git a/plugins/queue/mmap/segment_operation.go 
b/plugins/queue/mmap/segment_operation.go
index 42d54b2..aa2f421 100644
--- a/plugins/queue/mmap/segment_operation.go
+++ b/plugins/queue/mmap/segment_operation.go
@@ -35,7 +35,7 @@ import (
 
 // GetSegment returns a memory mapped file at the segmentID position.
 func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) {
-       if q.mmapCount >= q.MaxInMemSegments {
+       if atomic.LoadInt32(&q.mmapCount) >= q.MaxInMemSegments {
                q.insufficientMemChannel <- struct{}{}
                <-q.sufficientMemChannel
        }
@@ -85,14 +85,15 @@ func (q *Queue) unmapSegment(segmentID int64) error {
 // segmentSwapper run with a go routine to ensure the memory cost.
 func (q *Queue) segmentSwapper() {
        defer q.showDownWg.Done()
-       ctx, cancel := context.WithCancel(q.ctx)
-       defer cancel()
+       ctx, _ := context.WithCancel(q.ctx) // nolint
        for {
                select {
                case id := <-q.markReadChannel:
+                       q.lock(id)
                        if q.unmapSegment(id) != nil {
                                log.Logger.Errorf("cannot unmap the markread 
segment: %d", id)
                        }
+                       q.unlock(id)
                case <-q.insufficientMemChannel:
                        if q.mmapCount >= q.MaxInMemSegments {
                                if q.doSwap() != nil {
diff --git a/protocol/gen-codes/skywalking/network/go.mod 
b/protocol/gen-codes/skywalking/network/go.mod
index 0797ac0..6f56c75 100644
--- a/protocol/gen-codes/skywalking/network/go.mod
+++ b/protocol/gen-codes/skywalking/network/go.mod
@@ -3,7 +3,7 @@ module skywalking/network
 go 1.15
 
 require (
-       github.com/golang/protobuf v1.4.3
-       google.golang.org/grpc v1.35.0
-       google.golang.org/protobuf v1.25.0
+       github.com/golang/protobuf v1.5.2
+       google.golang.org/grpc v1.36.1
+       google.golang.org/protobuf v1.26.0
 )
diff --git a/protocol/gen-codes/skywalking/network/go.sum 
b/protocol/gen-codes/skywalking/network/go.sum
index 8f0da6e..76f5239 100644
--- a/protocol/gen-codes/skywalking/network/go.sum
+++ b/protocol/gen-codes/skywalking/network/go.sum
@@ -19,14 +19,16 @@ github.com/golang/protobuf 
v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
 github.com/golang/protobuf v1.4.0/go.mod 
h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
 github.com/golang/protobuf v1.4.1/go.mod 
h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
 github.com/golang/protobuf v1.4.2/go.mod 
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
-github.com/golang/protobuf v1.4.3 
h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
-github.com/golang/protobuf v1.4.3/go.mod 
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0/go.mod 
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 
h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
+github.com/golang/protobuf v1.5.2/go.mod 
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 github.com/google/go-cmp v0.2.0/go.mod 
h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.4.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
 github.com/google/go-cmp v0.5.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
+github.com/google/go-cmp v0.5.5/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/uuid v1.1.2/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod 
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -67,8 +69,8 @@ google.golang.org/grpc v1.19.0/go.mod 
h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi
 google.golang.org/grpc v1.23.0/go.mod 
h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
 google.golang.org/grpc v1.25.1/go.mod 
h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
 google.golang.org/grpc v1.27.0/go.mod 
h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
-google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8=
-google.golang.org/grpc v1.35.0/go.mod 
h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
+google.golang.org/grpc v1.36.1 h1:cmUfbeGKnz9+2DD/UYsMQXeqbHZqZDs4eQwW0sFOpBY=
+google.golang.org/grpc v1.36.1/go.mod 
h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod 
h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod 
h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
 google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod 
h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -77,8 +79,10 @@ google.golang.org/protobuf v1.21.0/go.mod 
h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi
 google.golang.org/protobuf v1.22.0/go.mod 
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 google.golang.org/protobuf v1.23.0/go.mod 
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod 
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
-google.golang.org/protobuf v1.25.0 
h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
 google.golang.org/protobuf v1.25.0/go.mod 
h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod 
h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0 
h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod 
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod 
h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

Reply via email to