This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch publish
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit cc4a399c7605b6cb92e56e88b99f73bfac5d5a7d
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Apr 15 03:51:10 2024 +0000

    Add batch publish test
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/liaison/grpc/measure.go     |  3 ++-
 banyand/liaison/grpc/server.go      |  2 ++
 banyand/liaison/grpc/stream.go      |  3 ++-
 banyand/measure/topn.go             |  5 +++--
 banyand/queue/local.go              |  4 +++-
 banyand/queue/pub/pub.go            |  6 ++++--
 banyand/queue/pub/pub_suite_test.go | 10 +++++++++-
 banyand/queue/pub/pub_test.go       | 39 ++++++++++++++++++++++++++++++++++---
 banyand/queue/queue.go              |  3 ++-
 9 files changed, 63 insertions(+), 12 deletions(-)

diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 930cec03..d149aef5 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -46,6 +46,7 @@ type measureService struct {
        ingestionAccessLog accesslog.Log
        pipeline           queue.Client
        broadcaster        queue.Client
+       writeTimeout       time.Duration
 }
 
 func (ms *measureService) setLogger(log *logger.Logger) {
@@ -67,7 +68,7 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                }
        }
        ctx := measure.Context()
-       publisher := ms.pipeline.NewBatchPublisher()
+       publisher := ms.pipeline.NewBatchPublisher(ms.writeTimeout)
        defer publisher.Close()
        for {
                select {
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 102bd170..a4d48bb5 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -178,6 +178,8 @@ func (s *server) FlagSet() *run.FlagSet {
        fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens")
        fs.BoolVar(&s.enableIngestionAccessLog, "enable-ingestion-access-log", 
false, "enable ingestion access log")
        fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access 
log root path")
+       fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout", 
15*time.Second, "stream write timeout")
+       fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout", 
15*time.Second, "measure write timeout")
        return fs
 }
 
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 3e530ff6..37635a4b 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -46,6 +46,7 @@ type streamService struct {
        ingestionAccessLog accesslog.Log
        pipeline           queue.Client
        broadcaster        queue.Client
+       writeTimeout       time.Duration
 }
 
 func (s *streamService) setLogger(log *logger.Logger) {
@@ -66,7 +67,7 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                        logger.Err(errResp).Msg("failed to send response")
                }
        }
-       publisher := s.pipeline.NewBatchPublisher()
+       publisher := s.pipeline.NewBatchPublisher(s.writeTimeout)
        defer publisher.Close()
        ctx := stream.Context()
        for {
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index 90b2d473..8f657175 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -50,7 +50,8 @@ import (
 )
 
 const (
-       timeBucketFormat = "200601021504"
+       timeBucketFormat         = "200601021504"
+       resultPersistencyTimeout = 10 * time.Second
 )
 
 var (
@@ -133,7 +134,7 @@ func (t *topNStreamingProcessor) writeStreamRecord(record 
flow.StreamRecord) err
        eventTime := t.downSampleTimeBucket(record.TimestampMillis())
        timeBucket := eventTime.Format(timeBucketFormat)
        var err error
-       publisher := t.pipeline.NewBatchPublisher()
+       publisher := t.pipeline.NewBatchPublisher(resultPersistencyTimeout)
        defer publisher.Close()
        for group, tuples := range tuplesGroups {
                if e := t.l.Debug(); e.Enabled() {
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 9609ac24..58c67781 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -19,6 +19,8 @@
 package queue
 
 import (
+       "time"
+
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/run"
@@ -75,7 +77,7 @@ func (l local) Name() string {
        return "local-pipeline"
 }
 
-func (l local) NewBatchPublisher() BatchPublisher {
+func (l local) NewBatchPublisher(_ time.Duration) BatchPublisher {
        return &localBatchPublisher{
                local: l.local,
        }
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index cbe1cd50..200b2b9d 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -159,10 +159,11 @@ func (p *pub) Publish(topic bus.Topic, messages 
...bus.Message) (bus.Future, err
 }
 
 // NewBatchPublisher returns a new batch publisher.
-func (p *pub) NewBatchPublisher() queue.BatchPublisher {
+func (p *pub) NewBatchPublisher(timeout time.Duration) queue.BatchPublisher {
        return &batchPublisher{
                pub:     p,
                streams: make(map[string]writeStream),
+               timeout: timeout,
                f:       batchFuture{errNodes: make(map[string]struct{}), l: 
p.log},
        }
 }
@@ -196,6 +197,7 @@ type batchPublisher struct {
        pub     *pub
        streams map[string]writeStream
        f       batchFuture
+       timeout time.Duration
 }
 
 func (bp *batchPublisher) Close() (err error) {
@@ -257,7 +259,7 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages 
...bus.Message) (bus
                        err = multierr.Append(err, fmt.Errorf("failed to get 
client for node %s", node))
                        continue
                }
-               ctx, cancel := context.WithTimeout(context.Background(), 
15*time.Second)
+               ctx, cancel := context.WithTimeout(context.Background(), 
bp.timeout)
                // this assignment is for getting around the go vet lint
                deferFn := cancel
                stream, errCreateStream := client.client.Send(ctx)
diff --git a/banyand/queue/pub/pub_suite_test.go 
b/banyand/queue/pub/pub_suite_test.go
index 96bee0ac..9a1c65aa 100644
--- a/banyand/queue/pub/pub_suite_test.go
+++ b/banyand/queue/pub/pub_suite_test.go
@@ -39,6 +39,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestPub(t *testing.T) {
@@ -46,6 +47,13 @@ func TestPub(t *testing.T) {
        ginkgo.RunSpecs(t, "Publish Suite")
 }
 
+var _ = ginkgo.BeforeSuite(func() {
+       gomega.Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(gomega.Succeed())
+})
+
 type mockServer struct {
        clusterv1.UnimplementedServiceServer
        healthServer *health.Server
@@ -79,7 +87,7 @@ func (s *mockServer) Send(stream 
clusterv1.Service_SendServer) error {
                }
 
                if err := stream.Send(res); err != nil {
-                       panic(err)
+                       return err
                }
        }
 }
diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go
index b68d19cc..bf20bfc0 100644
--- a/banyand/queue/pub/pub_test.go
+++ b/banyand/queue/pub/pub_test.go
@@ -56,7 +56,7 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
                        node2 := getDataNode("node2", addr2)
                        p.OnAddOrUpdate(node2)
 
-                       bp := p.NewBatchPublisher()
+                       bp := p.NewBatchPublisher(3 * time.Second)
                        defer bp.Close()
                        t := bus.UniTopic("test")
                        for i := 0; i < 10; i++ {
@@ -68,7 +68,7 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
                        }
                })
 
-               ginkgo.FIt("should go to evict queue when node is unavailable", 
func() {
+               ginkgo.It("should go to evict queue when node is unavailable", 
func() {
                        addr1 := getAddress()
                        addr2 := getAddress()
                        closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond)
@@ -84,7 +84,7 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
                        node2 := getDataNode("node2", addr2)
                        p.OnAddOrUpdate(node2)
 
-                       bp := p.NewBatchPublisher()
+                       bp := p.NewBatchPublisher(3 * time.Second)
                        t := bus.UniTopic("test")
                        for i := 0; i < 10; i++ {
                                _, err := bp.Publish(t,
@@ -107,5 +107,38 @@ var _ = ginkgo.Describe("publish clients 
register/unregister", func() {
                                
gomega.Expect(p.clients).Should(gomega.HaveKey("node1"))
                        }()
                })
+
+               ginkgo.It("should stay in active queue when operation takes a 
long time", func() {
+                       addr1 := getAddress()
+                       addr2 := getAddress()
+                       closeFn1 := setup(addr1, codes.OK, 0)
+                       closeFn2 := setup(addr2, codes.OK, 5*time.Second)
+                       p := newPub()
+                       defer func() {
+                               p.GracefulStop()
+                               closeFn1()
+                               closeFn2()
+                       }()
+                       node1 := getDataNode("node1", addr1)
+                       p.OnAddOrUpdate(node1)
+                       node2 := getDataNode("node2", addr2)
+                       p.OnAddOrUpdate(node2)
+
+                       bp := p.NewBatchPublisher(3 * time.Second)
+                       t := bus.UniTopic("test")
+                       for i := 0; i < 10; i++ {
+                               _, err := bp.Publish(t,
+                                       
bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", 
&streamv1.InternalWriteRequest{}),
+                                       
bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", 
&streamv1.InternalWriteRequest{}),
+                               )
+                               
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
+                       }
+                       
gomega.Expect(bp.Close()).ShouldNot(gomega.HaveOccurred())
+                       gomega.Consistently(func() int {
+                               p.mu.RLock()
+                               defer p.mu.RUnlock()
+                               return len(p.clients)
+                       }, "1s").Should(gomega.Equal(2))
+               })
        })
 })
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 1f4be29c..aad9141a 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -19,6 +19,7 @@ package queue
 
 import (
        "io"
+       "time"
 
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/bus"
@@ -41,7 +42,7 @@ type Client interface {
        run.Unit
        bus.Publisher
        bus.Broadcaster
-       NewBatchPublisher() BatchPublisher
+       NewBatchPublisher(timeout time.Duration) BatchPublisher
        Register(schema.EventHandler)
 }
 

Reply via email to