This is an automated email from the ASF dual-hosted git repository. liujiapeng pushed a commit to branch fix-race-bug in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
commit f353fac01c867f9198f289c56132fb4d8ae8129a Author: Evan <[email protected]> AuthorDate: Wed Apr 7 15:30:48 2021 +0800 fix some bugs in Satellite --- CHANGES.md | 22 +--- README.md | 4 +- docs/en/concepts-and-designs/mmap-queue.md | 24 ++-- docs/en/guides/contribution/How-to-release.md | 4 +- docs/en/guides/contribution/How-to-write-plugin.md | 2 +- go.mod | 8 +- go.sum | 14 ++- internal/satellite/module/sender/create.go | 3 +- internal/satellite/module/sender/sender.go | 138 ++++++++++++--------- plugins/queue/mmap/meta/meta.go | 32 ++++- plugins/queue/mmap/queue.go | 19 +-- plugins/queue/mmap/queue_lock.go | 54 ++++++++ plugins/queue/mmap/queue_operation.go | 13 +- plugins/queue/mmap/segment_operation.go | 7 +- protocol/gen-codes/skywalking/network/go.mod | 6 +- protocol/gen-codes/skywalking/network/go.sum | 16 ++- 16 files changed, 240 insertions(+), 126 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 1b4032e..be5f0f5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,25 +2,15 @@ Changes by Version ================== Release Notes. -0.1.0 +0.2.0 ------------------ #### Features -* Build the Satellite core structure. -* Add prometheus self telemetry. -* Add kafka client plugin. -* Add none-fallbacker plugin. -* Add timer-fallbacker plugin. -* Add nativelog-kafka-forwarder plugin. -* Add memory-queue plugin. -* Add mmap-queue plugin. -* Add grpc-nativelog-receiver plugin. -* Add http-nativelog-receiver plugin. -* Add grpc-server plugin. -* Add http-server plugin. -* Add prometheus-server plugin. + #### Bug Fixes +Fix the data race in mmap queue. +Fix channel blocking in sender module. #### Issues and PR -- All issues are [here](https://github.com/apache/skywalking/milestone/64?closed=1) -- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Aopen+is%3Apr+milestone%3A0.1.0) +- All issues are [here](https://github.com/apache/skywalking/milestone/80?closed=1) +- All and pull requests are [here](https://github.com/apache/skywalking-satellite/pulls?q=is%3Aopen+is%3Apr+milestone%3A0.2.0) diff --git a/README.md b/README.md index b1d97b7..6915593 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ NOTICE, SkyWalking Satellite uses [v3 protocols](https://github.com/apache/skywa Please read [the doc](./docs/en/FAQ/performance.md) to get more details. # Download -Go to the [download page](https://skywalking.apache.org/downloads/) to download all available binaries, including MacOS, Linux and Windows. Due to system compatibility problems, some plugins of SkyWalking Satellite cannot be used in Windows system. Check [the corresponding documentation](./docs/en/guides/compile/How-to-compile.md) to see whether the plugin is available on Windows. +Go to the [download page](https://skywalking.apache.org/downloads/) to download all available binaries, including MacOS, Linux and Windows. Due to system compatibility problems, some plugins of SkyWalking Satellite cannot be used in Windows system. Check [the corresponding documentation](./docs/en/setup/plugins) to see whether the plugin is available on Windows. # Compile As SkyWalking Satellite is using `Makefile`, compiling the project is as easy as executing a command in the root directory of the project. @@ -32,7 +32,7 @@ git submodule init git submodule update make build ``` -If you want to know more details about compiling, please read [the doc](./docs/en/guides/compile/How-to-compile.md). +If you want to know more details about compiling, please read [the doc](./docs/en/guides/compile/compile.md). # Commands diff --git a/docs/en/concepts-and-designs/mmap-queue.md b/docs/en/concepts-and-designs/mmap-queue.md index df0eaf0..d3b3582 100644 --- a/docs/en/concepts-and-designs/mmap-queue.md +++ b/docs/en/concepts-and-designs/mmap-queue.md @@ -41,12 +41,12 @@ goos: darwin goarch: amd64 pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap BenchmarkEnqueue -BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000 10000 106520 ns/op 9888 B/op 9 allocs/op -BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000 18536 54331 ns/op 9839 B/op 9 allocs/op -BenchmarkEnqueue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000 27859 43251 ns/op 9815 B/op 9 allocs/op -BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000 23673 45910 ns/op 9839 B/op 9 allocs/op -BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000 10000 131686 ns/op 18941 B/op 10 allocs/op -BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000 23011 47101 ns/op 9887 B/op 9 allocs/op +BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000 27585 43559 ns/op 9889 B/op 9 allocs/op +BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000 39326 31773 ns/op 9840 B/op 9 allocs/op +BenchmarkEnqueue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000 56770 22990 ns/op 9816 B/op 9 allocs/op +BenchmarkEnqueue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000 43803 29778 ns/op 9840 B/op 9 allocs/op +BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000 16870 80576 ns/op 18944 B/op 10 allocs/op +BenchmarkEnqueue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000 36922 39085 ns/op 9889 B/op 9 allocs/op PASS ``` ### push and pop operation @@ -55,11 +55,11 @@ goos: darwin goarch: amd64 pkg: github.com/apache/skywalking-satellite/plugins/queue/mmap BenchmarkEnqueueAndDequeue -BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000 18895 53056 ns/op 28773 B/op 42 allocs/op -BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000 24104 117128 ns/op 28725 B/op 42 allocs/op -BenchmarkEnqueueAndDequeue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000 23733 71632 ns/op 28699 B/op 41 allocs/op -BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000 26286 64377 ns/op 28725 B/op 42 allocs/op -BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000 10000 118004 ns/op 54978 B/op 43 allocs/op -BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000 16489 64400 ns/op 28772 B/op 42 allocs/op +BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:18_message:8KB_queueCapacity:10000 21030 60728 ns/op 28774 B/op 42 allocs/op +BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:10_message:8KB_queueCapacity:10000 30327 41274 ns/op 28726 B/op 42 allocs/op +BenchmarkEnqueueAndDequeue/segmentSize:_512KB_maxInMemSegments:6_message:8KB_queueCapacity:10000 32738 37923 ns/op 28700 B/op 42 allocs/op +BenchmarkEnqueueAndDequeue/segmentSize:_256KB_maxInMemSegments:20_message:8KB_queueCapacity:10000 28209 41169 ns/op 28726 B/op 42 allocs/op +BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:16KB_queueCapacity:10000 14677 89637 ns/op 54981 B/op 43 allocs/op +BenchmarkEnqueueAndDequeue/segmentSize:_128KB_maxInMemSegments:10_message:8KB_queueCapacity:100000 22228 54963 ns/op 28774 B/op 42 allocs/op PASS ``` diff --git a/docs/en/guides/contribution/How-to-release.md b/docs/en/guides/contribution/How-to-release.md index af07674..ad5dbd8 100644 --- a/docs/en/guides/contribution/How-to-release.md +++ b/docs/en/guides/contribution/How-to-release.md @@ -5,7 +5,7 @@ This documentation guides the release manager to release the SkyWalking Satellit ## Prerequisites 1. Close(if finished, or move to next milestone otherwise) all issues in the current milestone from [skywalking-satellite](https://github.com/apache/skywalking-satellite/milestones) and [skywalking](https://github.com/apache/skywalking/milestones), create a new milestone if needed. -2. Update [CHANGES.md](../../../../CHANGES.md). +2. Update [CHANGES.md](../CHANGES.md). ## Add your GPG public key to Apache svn @@ -152,7 +152,7 @@ are in `https://dist.apache.org/repos/dist/dev/skywalking/satellite/$VERSION` wi 1. `LICENSE` and `NOTICE` are in source codes and distribution package. 1. Check `shasum -c skywalking-satellite-$VERSION-{src,bin}.tgz.sha512`. 1. Check `gpg --verify skywalking-satellite-$VERSION-{src,bin}.tgz.asc skywalking-satellite-$VERSION-{src,bin}.tgz`. -1. Build distribution from source code package by following this command, `make build`. +1. Build distribution from source code package by following this [the build guide](#build-and-sign-the-source-code-package). 1. Licenses check, `make license`. Vote result should follow these: diff --git a/docs/en/guides/contribution/How-to-write-plugin.md b/docs/en/guides/contribution/How-to-write-plugin.md index 555f046..b2f4a61 100644 --- a/docs/en/guides/contribution/How-to-write-plugin.md +++ b/docs/en/guides/contribution/How-to-write-plugin.md @@ -32,7 +32,7 @@ Let's use memory-queue as an example of how to write a plugin. event_buffer_size: 5000 ``` -3. Add [unit test](../test/How-to-unit-test.md). +3. Add [unit test](../test/test.md). 4. Generate the plugin docs. ```shell script make check diff --git a/go.mod b/go.mod index 0c7e00c..d1d86c4 100644 --- a/go.mod +++ b/go.mod @@ -7,14 +7,14 @@ replace skywalking/network v1.0.0 => ./protocol/gen-codes/skywalking/network require ( github.com/Shopify/sarama v1.27.2 github.com/enriquebris/goconcurrentqueue v0.6.0 - github.com/golang/protobuf v1.4.3 - github.com/google/go-cmp v0.5.4 + github.com/golang/protobuf v1.5.2 + github.com/google/go-cmp v0.5.5 github.com/grandecola/mmap v0.6.0 github.com/prometheus/client_golang v1.9.0 github.com/sirupsen/logrus v1.7.0 github.com/spf13/viper v1.7.1 github.com/urfave/cli/v2 v2.3.0 - google.golang.org/grpc v1.35.0 - google.golang.org/protobuf v1.25.0 + google.golang.org/grpc v1.36.1 + google.golang.org/protobuf v1.26.0 skywalking/network v1.0.0 ) diff --git a/go.sum b/go.sum index 13be86f..5ec45ee 100644 --- a/go.sum +++ b/go.sum @@ -134,6 +134,9 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= @@ -147,8 +150,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= -github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -589,8 +592,8 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8= -google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.36.1 h1:cmUfbeGKnz9+2DD/UYsMQXeqbHZqZDs4eQwW0sFOpBY= +google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -601,6 +604,9 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= diff --git a/internal/satellite/module/sender/create.go b/internal/satellite/module/sender/create.go index 5da3cca..12906e2 100644 --- a/internal/satellite/module/sender/create.go +++ b/internal/satellite/module/sender/create.go @@ -36,8 +36,7 @@ func NewSender(cfg *api.SenderConfig, g gatherer.Gatherer) api.Sender { runningFallbacker: fallbacker.GetFallbacker(cfg.FallbackerConfig), runningClient: sharing.Manager[cfg.ClientName].(client.Client), gatherer: g, - logicInput: nil, - physicalInput: make(chan *event.OutputEventContext), + input: make(chan *event.OutputEventContext), listener: make(chan client.ClientStatus), flushChannel: make(chan *buffer.BatchBuffer, 1), buffer: buffer.NewBatchBuffer(cfg.MaxBufferSize), diff --git a/internal/satellite/module/sender/sender.go b/internal/satellite/module/sender/sender.go index 86eec6d..e75e24c 100644 --- a/internal/satellite/module/sender/sender.go +++ b/internal/satellite/module/sender/sender.go @@ -20,6 +20,7 @@ package sender import ( "context" "sync" + "sync/atomic" "time" "github.com/sirupsen/logrus" @@ -51,11 +52,11 @@ type Sender struct { gatherer gatherer.Gatherer // self components - logicInput chan *event.OutputEventContext // logic input channel - physicalInput chan *event.OutputEventContext // physical input channel - listener chan client.ClientStatus // client status listener - flushChannel chan *buffer.BatchBuffer // forwarder flush channel - buffer *buffer.BatchBuffer // cache the downstream input data + input chan *event.OutputEventContext // physical input channel + listener chan client.ClientStatus // client status listener + flushChannel chan *buffer.BatchBuffer // forwarder flush channel + buffer *buffer.BatchBuffer // cache the downstream input data + blocking int32 // the status of input channel // metrics sendCounter *telemetry.Counter @@ -65,7 +66,6 @@ type Sender struct { func (s *Sender) Prepare() error { log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is preparing...") s.runningClient.RegisterListener(s.listener) - s.logicInput = s.physicalInput for _, runningForwarder := range s.runningForwarders { err := runningForwarder.Prepare(s.runningClient.GetConnectedClient()) if err != nil { @@ -80,64 +80,90 @@ func (s *Sender) Prepare() error { func (s *Sender) Boot(ctx context.Context) { log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is starting...") var wg sync.WaitGroup - wg.Add(2) - // 1. keep fetching the downstream data when client connected, and put it into BatchBuffer. - // 2. When reaches the buffer limit or receives a timer flush signal, and put BatchBuffer into flushChannel. - go func() { - defer wg.Done() - childCtx, cancel := context.WithCancel(ctx) - timeTicker := time.NewTicker(time.Duration(s.config.FlushTime) * time.Millisecond) - for { - select { - case status := <-s.listener: - switch status { - case client.Connected: - log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module is connected") - s.logicInput = s.physicalInput - case client.Disconnect: - log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module is disconnected") - s.logicInput = nil - } - case <-timeTicker.C: - if s.buffer.Len() > s.config.MinFlushEvents { - s.flushChannel <- s.buffer - s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize) - } - case e := <-s.logicInput: - s.buffer.Add(e) - if s.buffer.Len() == s.config.MaxBufferSize { - s.flushChannel <- s.buffer - s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize) - } - case <-childCtx.Done(): - cancel() - s.logicInput = nil - return + wg.Add(3) + go s.store(ctx, &wg) + go s.listen(ctx, &wg) + go s.flush(ctx, &wg) + wg.Wait() +} + +// store data. +// 1. keep fetching the downstream data when client connected, and put it into BatchBuffer. +// 2. When reaches the buffer limit or receives a timer flush signal, and put BatchBuffer into flushChannel. +func (s *Sender) store(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + defer log.Logger.WithField("pipe", s.config.PipeName).Infof("store routine closed") + childCtx, _ := context.WithCancel(ctx) // nolint + timeTicker := time.NewTicker(time.Duration(s.config.FlushTime) * time.Millisecond) + for { + // blocking output when disconnecting. + if atomic.LoadInt32(&s.blocking) == 1 { + time.Sleep(100 * time.Millisecond) + continue + } + select { + case <-childCtx.Done(): + return + case <-timeTicker.C: + if s.buffer.Len() > s.config.MinFlushEvents { + s.flushChannel <- s.buffer + s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize) + } + case e := <-s.input: + if e == nil { + continue + } + s.buffer.Add(e) + if s.buffer.Len() == s.config.MaxBufferSize { + s.flushChannel <- s.buffer + s.buffer = buffer.NewBatchBuffer(s.config.MaxBufferSize) } } - }() - // Keep fetching BatchBuffer to forward. - go func() { - defer wg.Done() - childCtx, cancel := context.WithCancel(ctx) - for { - select { - case b := <-s.flushChannel: - s.consume(b) - case <-childCtx.Done(): - cancel() - s.Shutdown() - return + } +} + +// Listen the client status. +func (s *Sender) listen(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + defer log.Logger.WithField("pipe", s.config.PipeName).Infof("listen routine closed") + childCtx, _ := context.WithCancel(ctx) // nolint + for { + select { + case <-childCtx.Done(): + return + case status := <-s.listener: + switch status { + case client.Connected: + log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module connected") + atomic.StoreInt32(&s.blocking, 0) + case client.Disconnect: + log.Logger.WithField("pipe", s.config.PipeName).Info("the client connection of the sender module disconnected") + atomic.StoreInt32(&s.blocking, 1) } } - }() - wg.Wait() + } +} + +// Keep fetching BatchBuffer to forward. +func (s *Sender) flush(ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + defer log.Logger.WithField("pipe", s.config.PipeName).Infof("flush routine closed") + childCtx, _ := context.WithCancel(ctx) // nolint + for { + select { + case <-childCtx.Done(): + s.Shutdown() + return + case b := <-s.flushChannel: + s.consume(b) + } + } } // Shutdown closes the channels and tries to force forward the events in the buffer. func (s *Sender) Shutdown() { log.Logger.WithField("pipe", s.config.PipeName).Info("sender module is closing") - close(s.physicalInput) + close(s.input) ticker := time.NewTicker(module.ShutdownHookTime) for { select { @@ -187,5 +213,5 @@ func (s *Sender) consume(batch *buffer.BatchBuffer) { } func (s *Sender) InputDataChannel() chan<- *event.OutputEventContext { - return s.logicInput + return s.input } diff --git a/plugins/queue/mmap/meta/meta.go b/plugins/queue/mmap/meta/meta.go index 11f75fa..36a5faa 100644 --- a/plugins/queue/mmap/meta/meta.go +++ b/plugins/queue/mmap/meta/meta.go @@ -22,6 +22,7 @@ package meta import ( "fmt" "path/filepath" + "sync" "syscall" "github.com/grandecola/mmap" @@ -60,6 +61,7 @@ type Metadata struct { name string size int capacity int + lock sync.RWMutex } // NewMetaData read or create a Metadata with supported metaVersion @@ -92,76 +94,104 @@ func NewMetaData(metaDir string, capacity int) (*Metadata, error) { // GetVersion returns the meta version. func (m *Metadata) GetVersion() int { + m.lock.RLock() + defer m.lock.RUnlock() return int(m.metaFile.ReadUint64At(versionPos)) } // PutVersion put the version into the memory mapped file. func (m *Metadata) PutVersion(version int64) { + m.lock.Lock() + defer m.lock.Unlock() m.metaFile.WriteUint64At(uint64(version), versionPos) } // GetWritingOffset returns the writing offset, which contains the segment ID and the offset of the segment. func (m *Metadata) GetWritingOffset() (segmentID, offset int64) { + m.lock.RLock() + defer m.lock.RUnlock() return int64(m.metaFile.ReadUint64At(widPos)), int64(m.metaFile.ReadUint64At(woffsetPos)) } // PutWritingOffset put the segment ID and the offset of the segment into the writing offset. func (m *Metadata) PutWritingOffset(segmentID, offset int64) { + m.lock.Lock() + defer m.lock.Unlock() m.metaFile.WriteUint64At(uint64(segmentID), widPos) m.metaFile.WriteUint64At(uint64(offset), woffsetPos) } // GetWatermarkOffset returns the watermark offset, which contains the segment ID and the offset of the segment. func (m *Metadata) GetWatermarkOffset() (segmentID, offset int64) { + m.lock.RLock() + defer m.lock.RUnlock() return int64(m.metaFile.ReadUint64At(wmidPos)), int64(m.metaFile.ReadUint64At(wmoffsetPos)) } // PutWatermarkOffset put the segment ID and the offset of the segment into the watermark offset. func (m *Metadata) PutWatermarkOffset(segmentID, offset int64) { + m.lock.Lock() + defer m.lock.Unlock() m.metaFile.WriteUint64At(uint64(segmentID), wmidPos) m.metaFile.WriteUint64At(uint64(offset), wmoffsetPos) } // GetCommittedOffset returns the committed offset, which contains the segment ID and the offset of the segment. func (m *Metadata) GetCommittedOffset() (segmentID, offset int64) { + m.lock.RLock() + defer m.lock.RUnlock() return int64(m.metaFile.ReadUint64At(cidPos)), int64(m.metaFile.ReadUint64At(coffsetPos)) } // PutCommittedOffset put the segment ID and the offset of the segment into the committed offset. func (m *Metadata) PutCommittedOffset(segmentID, offset int64) { + m.lock.Lock() + defer m.lock.Unlock() m.metaFile.WriteUint64At(uint64(segmentID), cidPos) m.metaFile.WriteUint64At(uint64(offset), coffsetPos) } // GetReadingOffset returns the reading offset, which contains the segment ID and the offset of the segment. func (m *Metadata) GetReadingOffset() (segmentID, offset int64) { + m.lock.RLock() + defer m.lock.RUnlock() return int64(m.metaFile.ReadUint64At(ridPos)), int64(m.metaFile.ReadUint64At(roffsetPos)) } // PutReadingOffset put the segment ID and the offset of the segment into the reading offset. func (m *Metadata) PutReadingOffset(segmentID, offset int64) { + m.lock.Lock() + defer m.lock.Unlock() m.metaFile.WriteUint64At(uint64(segmentID), ridPos) m.metaFile.WriteUint64At(uint64(offset), roffsetPos) } // GetCapacity returns the capacity of the queue. func (m *Metadata) GetCapacity() int { + m.lock.RLock() + defer m.lock.RUnlock() return int(m.metaFile.ReadUint64At(capacityPos)) } // PutCapacity put the capacity into the memory mapped file. func (m *Metadata) PutCapacity(version int64) { + m.lock.Lock() + defer m.lock.Unlock() m.metaFile.WriteUint64At(uint64(version), capacityPos) } // Flush the memory mapped file to the disk. func (m *Metadata) Flush() error { + m.lock.Lock() + defer m.lock.Unlock() return m.metaFile.Flush(syscall.MS_SYNC) } // Close do Flush operation and unmap the memory mapped file. func (m *Metadata) Close() error { - if err := m.Flush(); err != nil { + m.lock.Lock() + defer m.lock.Unlock() + if err := m.metaFile.Flush(syscall.MS_SYNC); err != nil { return err } return m.metaFile.Unmap() diff --git a/plugins/queue/mmap/queue.go b/plugins/queue/mmap/queue.go index 44d20ee..539c59f 100644 --- a/plugins/queue/mmap/queue.go +++ b/plugins/queue/mmap/queue.go @@ -61,7 +61,6 @@ type Queue struct { MaxEventSize int `mapstructure:"max_event_size"` // The max size of the input event. // running components - lock sync.Mutex queueName string // The queue name. meta *meta.Metadata // The metadata file. segments []*mmap.File // The data files. @@ -72,6 +71,7 @@ type Queue struct { sufficientMemChannel chan struct{} // Notify when memory is sufficient markReadChannel chan int64 // Transfer the read segmentID to do ummap operation. ready bool // The status of the queue. + locker []int32 // locker // control components ctx context.Context // Parent ctx @@ -145,6 +145,7 @@ func (q *Queue) Initialize() error { q.ctx, q.cancel = context.WithCancel(context.Background()) // async supported processes. q.showDownWg.Add(2) + q.locker = make([]int32, q.QueueCapacitySegments) go q.segmentSwapper() go q.flush() q.ready = true @@ -187,8 +188,7 @@ func (q *Queue) Dequeue() (*api.SequenceEvent, error) { } func (q *Queue) Close() error { - q.lock.Lock() - defer q.lock.Unlock() + q.ready = false q.cancel() q.showDownWg.Wait() for i, segment := range q.segments { @@ -204,7 +204,6 @@ func (q *Queue) Close() error { if err := q.meta.Close(); err != nil { log.Logger.Errorf("cannot unmap the metadata: %v", err) } - q.ready = false return nil } @@ -226,8 +225,7 @@ func (q *Queue) Ack(lastOffset event.Offset) { // flush control the flush operation by timer or counter. func (q *Queue) flush() { defer q.showDownWg.Done() - ctx, cancel := context.WithCancel(q.ctx) - defer cancel() + ctx, _ := context.WithCancel(q.ctx) // nolint for { timer := time.NewTimer(time.Duration(q.FlushPeriod) * time.Millisecond) select { @@ -245,13 +243,16 @@ func (q *Queue) flush() { // doFlush flush the segment and meta files to the disk. func (q *Queue) doFlush() { - for _, segment := range q.segments { - if segment == nil { + for i := range q.segments { + q.lockByIndex(i) + if q.segments[i] == nil { + q.unlockByIndex(i) continue } - if err := segment.Flush(syscall.MS_SYNC); err != nil { + if err := q.segments[i].Flush(syscall.MS_SYNC); err != nil { log.Logger.Errorf("cannot flush segment file: %v", err) } + q.unlockByIndex(i) } wid, woffset := q.meta.GetWritingOffset() q.meta.PutWatermarkOffset(wid, woffset) diff --git a/plugins/queue/mmap/queue_lock.go b/plugins/queue/mmap/queue_lock.go new file mode 100644 index 0000000..7e7c06b --- /dev/null +++ b/plugins/queue/mmap/queue_lock.go @@ -0,0 +1,54 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package mmap + +import "sync/atomic" + +func (q *Queue) lock(segmentID int64) { + index := q.GetIndex(segmentID) + for { + if atomic.CompareAndSwapInt32(&q.locker[index], 0, 1) { + return + } + } +} + +func (q *Queue) unlock(segmentID int64) { + index := q.GetIndex(segmentID) + for { + if atomic.CompareAndSwapInt32(&q.locker[index], 1, 0) { + return + } + } +} + +func (q *Queue) lockByIndex(index int) { + for { + if atomic.CompareAndSwapInt32(&q.locker[index], 0, 1) { + return + } + } +} + +func (q *Queue) unlockByIndex(index int) { + for { + if atomic.CompareAndSwapInt32(&q.locker[index], 1, 0) { + return + } + } +} diff --git a/plugins/queue/mmap/queue_operation.go b/plugins/queue/mmap/queue_operation.go index 9ee6e74..d00df14 100644 --- a/plugins/queue/mmap/queue_operation.go +++ b/plugins/queue/mmap/queue_operation.go @@ -43,8 +43,6 @@ const uInt64Size = 8 // enqueue writes the data into the file system. It first writes the length of the data, // then the data itself. It means the whole data may not exist in the one segments. func (q *Queue) enqueue(bytes []byte) error { - q.lock.Lock() - defer q.lock.Unlock() if q.isFull() { return api.ErrFull } @@ -69,8 +67,6 @@ func (q *Queue) enqueue(bytes []byte) error { // dequeue reads the data from the file system. It first reads the length of the data, // then the data itself. It means the whole data may not exist in the one segments. func (q *Queue) dequeue() (data []byte, rid, roffset int64, err error) { - q.lock.Lock() - defer q.lock.Unlock() if q.isEmpty() { return nil, 0, 0, api.ErrEmpty } @@ -95,11 +91,13 @@ func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, new counter := 0 res := make([]byte, length) for { + q.lock(id) segment, err := q.GetSegment(id) if err != nil { return nil, 0, 0, err } readBytes, err := segment.ReadAt(res[counter:], offset) + q.unlock(id) if err != nil { return nil, 0, 0, err } @@ -120,11 +118,13 @@ func (q *Queue) readLength(id, offset int64) (newID, newOffset int64, length int if offset+uInt64Size > int64(q.SegmentSize) { id, offset = id+1, 0 } + q.lock(id) segment, err := q.GetSegment(id) if err != nil { return 0, 0, 0, err } num := segment.ReadUint64At(offset) + q.unlock(id) offset += uInt64Size if offset == int64(q.SegmentSize) { id, offset = id+1, 0 @@ -137,11 +137,13 @@ func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int6 if offset+uInt64Size > int64(q.SegmentSize) { id, offset = id+1, 0 } + q.lock(id) segment, err := q.GetSegment(id) if err != nil { return 0, 0, err } segment.WriteUint64At(uint64(length), offset) + q.unlock(id) offset += uInt64Size if offset == int64(q.SegmentSize) { id, offset = id+1, 0 @@ -153,13 +155,14 @@ func (q *Queue) writeLength(length int, id, offset int64) (newID, newOffset int6 func (q *Queue) writeBytes(bytes []byte, id, offset int64) (newID, newOffset int64, err error) { counter := 0 length := len(bytes) - for { + q.lock(id) segment, err := q.GetSegment(id) if err != nil { return 0, 0, err } writtenBytes, err := segment.WriteAt(bytes[counter:], offset) + q.unlock(id) if err != nil { return 0, 0, err } diff --git a/plugins/queue/mmap/segment_operation.go b/plugins/queue/mmap/segment_operation.go index 42d54b2..aa2f421 100644 --- a/plugins/queue/mmap/segment_operation.go +++ b/plugins/queue/mmap/segment_operation.go @@ -35,7 +35,7 @@ import ( // GetSegment returns a memory mapped file at the segmentID position. func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) { - if q.mmapCount >= q.MaxInMemSegments { + if atomic.LoadInt32(&q.mmapCount) >= q.MaxInMemSegments { q.insufficientMemChannel <- struct{}{} <-q.sufficientMemChannel } @@ -85,14 +85,15 @@ func (q *Queue) unmapSegment(segmentID int64) error { // segmentSwapper run with a go routine to ensure the memory cost. func (q *Queue) segmentSwapper() { defer q.showDownWg.Done() - ctx, cancel := context.WithCancel(q.ctx) - defer cancel() + ctx, _ := context.WithCancel(q.ctx) // nolint for { select { case id := <-q.markReadChannel: + q.lock(id) if q.unmapSegment(id) != nil { log.Logger.Errorf("cannot unmap the markread segment: %d", id) } + q.unlock(id) case <-q.insufficientMemChannel: if q.mmapCount >= q.MaxInMemSegments { if q.doSwap() != nil { diff --git a/protocol/gen-codes/skywalking/network/go.mod b/protocol/gen-codes/skywalking/network/go.mod index 0797ac0..6f56c75 100644 --- a/protocol/gen-codes/skywalking/network/go.mod +++ b/protocol/gen-codes/skywalking/network/go.mod @@ -3,7 +3,7 @@ module skywalking/network go 1.15 require ( - github.com/golang/protobuf v1.4.3 - google.golang.org/grpc v1.35.0 - google.golang.org/protobuf v1.25.0 + github.com/golang/protobuf v1.5.2 + google.golang.org/grpc v1.36.1 + google.golang.org/protobuf v1.26.0 ) diff --git a/protocol/gen-codes/skywalking/network/go.sum b/protocol/gen-codes/skywalking/network/go.sum index 8f0da6e..76f5239 100644 --- a/protocol/gen-codes/skywalking/network/go.sum +++ b/protocol/gen-codes/skywalking/network/go.sum @@ -19,14 +19,16 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= -github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -67,8 +69,8 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8= -google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.36.1 h1:cmUfbeGKnz9+2DD/UYsMQXeqbHZqZDs4eQwW0sFOpBY= +google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -77,8 +79,10 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
