This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 256e509a Remove channel in local pipeline (#483)
256e509a is described below
commit 256e509a2241be63e432ad111adf5054fef2a33f
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Jul 4 11:34:29 2024 +0800
Remove channel in local pipeline (#483)
Signed-off-by: Gao Hongtao <[email protected]>
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
CHANGES.md | 1 +
pkg/bus/bus.go | 99 +++++++++++-----------------------------------------------
2 files changed, 20 insertions(+), 80 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 408a8b29..8071b939 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -10,6 +10,7 @@ Release Notes.
- Improve sorting performance of stream.
- Add the measure query trace.
- Assign a separate lookup table to each group in the maglev selector.
+- Convert the async local pipeline to a sync pipeline.
### Bugs
diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go
index 425bc60f..b4bb5efd 100644
--- a/pkg/bus/bus.go
+++ b/pkg/bus/bus.go
@@ -24,10 +24,6 @@ import (
"io"
"sync"
"time"
-
- "go.uber.org/multierr"
-
- "github.com/apache/skywalking-banyandb/pkg/run"
)
type (
@@ -106,8 +102,6 @@ type Broadcaster interface {
Broadcast(timeout time.Duration, topic Topic, message Message)
([]Future, error)
}
-type channel chan event
-
type chType int
var (
@@ -138,16 +132,14 @@ func (t Topic) String() string {
// The Bus allows publish-subscribe-style communication between components.
type Bus struct {
- topics map[Topic][]channel
- closer *run.Closer
+ topics map[Topic][]MessageListener
mutex sync.RWMutex
}
// NewBus returns a Bus.
func NewBus() *Bus {
b := new(Bus)
- b.topics = make(map[Topic][]channel)
- b.closer = run.NewCloser(0)
+ b.topics = make(map[Topic][]MessageListener)
return b
}
@@ -171,41 +163,20 @@ func (e *emptyFuture) GetAll() ([]Message, error) {
}
type localFuture struct {
- retCh chan Message
- retCount int
+ messages []Message
}
func (l *localFuture) Get() (Message, error) {
- if l.retCount < 1 {
+ if len(l.messages) == 0 {
return Message{}, io.EOF
}
- m, ok := <-l.retCh
- if ok {
- l.retCount--
- return m, nil
- }
- return Message{}, io.EOF
+ m := l.messages[0]
+ l.messages = l.messages[1:]
+ return m, nil
}
func (l *localFuture) GetAll() ([]Message, error) {
- var globalErr error
- ret := make([]Message, 0, l.retCount)
- for {
- m, err := l.Get()
- if errors.Is(err, io.EOF) {
- return ret, globalErr
- }
- if err != nil {
- globalErr = multierr.Append(globalErr, err)
- continue
- }
- ret = append(ret, m)
- }
-}
-
-type event struct {
- f Future
- m Message
+ return l.messages, nil
}
// Publish sends Messages to a Topic.
@@ -215,33 +186,24 @@ func (b *Bus) Publish(topic Topic, message ...Message)
(Future, error) {
}
b.mutex.RLock()
defer b.mutex.RUnlock()
- cc, exit := b.topics[topic]
+ mll, exit := b.topics[topic]
if !exit {
return nil, ErrTopicNotExist
}
- var f Future
+ var f *localFuture
switch topic.typ {
case chTypeUnidirectional:
f = nil
case chTypeBidirectional:
- f = &localFuture{retCount: len(message), retCh: make(chan
Message)}
+ f = &localFuture{messages: make([]Message, 0, len(message))}
}
- for _, each := range cc {
+ for _, ml := range mll {
for _, m := range message {
- go func(ch channel, message Message) {
- if !b.closer.AddRunning() {
- return
- }
- defer b.closer.Done()
- select {
- case <-b.closer.CloseNotify():
- return
- case ch <- event{
- m: message,
- f: f,
- }:
- }
- }(each, m)
+ if f != nil {
+ f.messages = append(f.messages, ml.Rev(m))
+ } else {
+ ml.Rev(m)
+ }
}
}
if f == nil {
@@ -261,37 +223,14 @@ func (b *Bus) Subscribe(topic Topic, listener
MessageListener) error {
b.mutex.Lock()
defer b.mutex.Unlock()
if _, exist := b.topics[topic]; !exist {
- b.topics[topic] = make([]channel, 0)
+ b.topics[topic] = make([]MessageListener, 0)
}
- ch := make(channel)
list := b.topics[topic]
- list = append(list, ch)
+ list = append(list, listener)
b.topics[topic] = list
- go func(listener MessageListener, ch channel) {
- for {
- c, ok := <-ch
- if ok {
- ret := listener.Rev(c.m)
- if c.f == nil {
- continue
- }
- if lf, ok := c.f.(*localFuture); ok {
- lf.retCh <- ret
- }
- } else {
- break
- }
- }
- }(listener, ch)
return nil
}
// Close a Bus until all Messages are sent to Subscribers.
func (b *Bus) Close() {
- b.closer.CloseThenWait()
- for _, chs := range b.topics {
- for _, ch := range chs {
- close(ch)
- }
- }
}