This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 256e509a Remove channel in local pipeline (#483)
256e509a is described below

commit 256e509a2241be63e432ad111adf5054fef2a33f
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Jul 4 11:34:29 2024 +0800

    Remove channel in local pipeline (#483)
    
    Signed-off-by: Gao Hongtao <[email protected]>
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
 CHANGES.md     |  1 +
 pkg/bus/bus.go | 99 +++++++++++-----------------------------------------------
 2 files changed, 20 insertions(+), 80 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 408a8b29..8071b939 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -10,6 +10,7 @@ Release Notes.
 - Improve sorting performance of stream.
 - Add the measure query trace.
 - Assign a separate lookup table to each group in the maglev selector.
+- Convert the async local pipeline to a sync pipeline.
 
 ### Bugs
 
diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go
index 425bc60f..b4bb5efd 100644
--- a/pkg/bus/bus.go
+++ b/pkg/bus/bus.go
@@ -24,10 +24,6 @@ import (
        "io"
        "sync"
        "time"
-
-       "go.uber.org/multierr"
-
-       "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 type (
@@ -106,8 +102,6 @@ type Broadcaster interface {
        Broadcast(timeout time.Duration, topic Topic, message Message) 
([]Future, error)
 }
 
-type channel chan event
-
 type chType int
 
 var (
@@ -138,16 +132,14 @@ func (t Topic) String() string {
 
 // The Bus allows publish-subscribe-style communication between components.
 type Bus struct {
-       topics map[Topic][]channel
-       closer *run.Closer
+       topics map[Topic][]MessageListener
        mutex  sync.RWMutex
 }
 
 // NewBus returns a Bus.
 func NewBus() *Bus {
        b := new(Bus)
-       b.topics = make(map[Topic][]channel)
-       b.closer = run.NewCloser(0)
+       b.topics = make(map[Topic][]MessageListener)
        return b
 }
 
@@ -171,41 +163,20 @@ func (e *emptyFuture) GetAll() ([]Message, error) {
 }
 
 type localFuture struct {
-       retCh    chan Message
-       retCount int
+       messages []Message
 }
 
 func (l *localFuture) Get() (Message, error) {
-       if l.retCount < 1 {
+       if len(l.messages) == 0 {
                return Message{}, io.EOF
        }
-       m, ok := <-l.retCh
-       if ok {
-               l.retCount--
-               return m, nil
-       }
-       return Message{}, io.EOF
+       m := l.messages[0]
+       l.messages = l.messages[1:]
+       return m, nil
 }
 
 func (l *localFuture) GetAll() ([]Message, error) {
-       var globalErr error
-       ret := make([]Message, 0, l.retCount)
-       for {
-               m, err := l.Get()
-               if errors.Is(err, io.EOF) {
-                       return ret, globalErr
-               }
-               if err != nil {
-                       globalErr = multierr.Append(globalErr, err)
-                       continue
-               }
-               ret = append(ret, m)
-       }
-}
-
-type event struct {
-       f Future
-       m Message
+       return l.messages, nil
 }
 
 // Publish sends Messages to a Topic.
@@ -215,33 +186,24 @@ func (b *Bus) Publish(topic Topic, message ...Message) 
(Future, error) {
        }
        b.mutex.RLock()
        defer b.mutex.RUnlock()
-       cc, exit := b.topics[topic]
+       mll, exit := b.topics[topic]
        if !exit {
                return nil, ErrTopicNotExist
        }
-       var f Future
+       var f *localFuture
        switch topic.typ {
        case chTypeUnidirectional:
                f = nil
        case chTypeBidirectional:
-               f = &localFuture{retCount: len(message), retCh: make(chan 
Message)}
+               f = &localFuture{messages: make([]Message, 0, len(message))}
        }
-       for _, each := range cc {
+       for _, ml := range mll {
                for _, m := range message {
-                       go func(ch channel, message Message) {
-                               if !b.closer.AddRunning() {
-                                       return
-                               }
-                               defer b.closer.Done()
-                               select {
-                               case <-b.closer.CloseNotify():
-                                       return
-                               case ch <- event{
-                                       m: message,
-                                       f: f,
-                               }:
-                               }
-                       }(each, m)
+                       if f != nil {
+                               f.messages = append(f.messages, ml.Rev(m))
+                       } else {
+                               ml.Rev(m)
+                       }
                }
        }
        if f == nil {
@@ -261,37 +223,14 @@ func (b *Bus) Subscribe(topic Topic, listener 
MessageListener) error {
        b.mutex.Lock()
        defer b.mutex.Unlock()
        if _, exist := b.topics[topic]; !exist {
-               b.topics[topic] = make([]channel, 0)
+               b.topics[topic] = make([]MessageListener, 0)
        }
-       ch := make(channel)
        list := b.topics[topic]
-       list = append(list, ch)
+       list = append(list, listener)
        b.topics[topic] = list
-       go func(listener MessageListener, ch channel) {
-               for {
-                       c, ok := <-ch
-                       if ok {
-                               ret := listener.Rev(c.m)
-                               if c.f == nil {
-                                       continue
-                               }
-                               if lf, ok := c.f.(*localFuture); ok {
-                                       lf.retCh <- ret
-                               }
-                       } else {
-                               break
-                       }
-               }
-       }(listener, ch)
        return nil
 }
 
 // Close a Bus until all Messages are sent to Subscribers.
 func (b *Bus) Close() {
-       b.closer.CloseThenWait()
-       for _, chs := range b.topics {
-               for _, ch := range chs {
-                       close(ch)
-               }
-       }
 }

Reply via email to