This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch local in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e1f88f41620c2331942536e8051aff8944a12cbe Author: Gao Hongtao <[email protected]> AuthorDate: Thu Jul 4 06:22:17 2024 +0800 Remove channel in local pipeline Signed-off-by: Gao Hongtao <[email protected]> --- CHANGES.md | 1 + pkg/bus/bus.go | 99 +++++++++++----------------------------------------------- 2 files changed, 20 insertions(+), 80 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 408a8b29..8071b939 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,7 @@ Release Notes. - Improve sorting performance of stream. - Add the measure query trace. - Assign a separate lookup table to each group in the maglev selector. +- Convert the async local pipeline to a sync pipeline. ### Bugs diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 425bc60f..b4bb5efd 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -24,10 +24,6 @@ import ( "io" "sync" "time" - - "go.uber.org/multierr" - - "github.com/apache/skywalking-banyandb/pkg/run" ) type ( @@ -106,8 +102,6 @@ type Broadcaster interface { Broadcast(timeout time.Duration, topic Topic, message Message) ([]Future, error) } -type channel chan event - type chType int var ( @@ -138,16 +132,14 @@ func (t Topic) String() string { // The Bus allows publish-subscribe-style communication between components. type Bus struct { - topics map[Topic][]channel - closer *run.Closer + topics map[Topic][]MessageListener mutex sync.RWMutex } // NewBus returns a Bus. func NewBus() *Bus { b := new(Bus) - b.topics = make(map[Topic][]channel) - b.closer = run.NewCloser(0) + b.topics = make(map[Topic][]MessageListener) return b } @@ -171,41 +163,20 @@ func (e *emptyFuture) GetAll() ([]Message, error) { } type localFuture struct { - retCh chan Message - retCount int + messages []Message } func (l *localFuture) Get() (Message, error) { - if l.retCount < 1 { + if len(l.messages) == 0 { return Message{}, io.EOF } - m, ok := <-l.retCh - if ok { - l.retCount-- - return m, nil - } - return Message{}, io.EOF + m := l.messages[0] + l.messages = l.messages[1:] + return m, nil } func (l *localFuture) GetAll() ([]Message, error) { - var globalErr error - ret := make([]Message, 0, l.retCount) - for { - m, err := l.Get() - if errors.Is(err, io.EOF) { - return ret, globalErr - } - if err != nil { - globalErr = multierr.Append(globalErr, err) - continue - } - ret = append(ret, m) - } -} - -type event struct { - f Future - m Message + return l.messages, nil } // Publish sends Messages to a Topic. @@ -215,33 +186,24 @@ func (b *Bus) Publish(topic Topic, message ...Message) (Future, error) { } b.mutex.RLock() defer b.mutex.RUnlock() - cc, exit := b.topics[topic] + mll, exit := b.topics[topic] if !exit { return nil, ErrTopicNotExist } - var f Future + var f *localFuture switch topic.typ { case chTypeUnidirectional: f = nil case chTypeBidirectional: - f = &localFuture{retCount: len(message), retCh: make(chan Message)} + f = &localFuture{messages: make([]Message, 0, len(message))} } - for _, each := range cc { + for _, ml := range mll { for _, m := range message { - go func(ch channel, message Message) { - if !b.closer.AddRunning() { - return - } - defer b.closer.Done() - select { - case <-b.closer.CloseNotify(): - return - case ch <- event{ - m: message, - f: f, - }: - } - }(each, m) + if f != nil { + f.messages = append(f.messages, ml.Rev(m)) + } else { + ml.Rev(m) + } } } if f == nil { @@ -261,37 +223,14 @@ func (b *Bus) Subscribe(topic Topic, listener MessageListener) error { b.mutex.Lock() defer b.mutex.Unlock() if _, exist := b.topics[topic]; !exist { - b.topics[topic] = make([]channel, 0) + b.topics[topic] = make([]MessageListener, 0) } - ch := make(channel) list := b.topics[topic] - list = append(list, ch) + list = append(list, listener) b.topics[topic] = list - go func(listener MessageListener, ch channel) { - for { - c, ok := <-ch - if ok { - ret := listener.Rev(c.m) - if c.f == nil { - continue - } - if lf, ok := c.f.(*localFuture); ok { - lf.retCh <- ret - } - } else { - break - } - } - }(listener, ch) return nil } // Close a Bus until all Messages are sent to Subscribers. func (b *Bus) Close() { - b.closer.CloseThenWait() - for _, chs := range b.topics { - for _, ch := range chs { - close(ch) - } - } }
