This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch publish in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit cc4a399c7605b6cb92e56e88b99f73bfac5d5a7d Author: Gao Hongtao <[email protected]> AuthorDate: Mon Apr 15 03:51:10 2024 +0000 Add batch publish test Signed-off-by: Gao Hongtao <[email protected]> --- banyand/liaison/grpc/measure.go | 3 ++- banyand/liaison/grpc/server.go | 2 ++ banyand/liaison/grpc/stream.go | 3 ++- banyand/measure/topn.go | 5 +++-- banyand/queue/local.go | 4 +++- banyand/queue/pub/pub.go | 6 ++++-- banyand/queue/pub/pub_suite_test.go | 10 +++++++++- banyand/queue/pub/pub_test.go | 39 ++++++++++++++++++++++++++++++++++--- banyand/queue/queue.go | 3 ++- 9 files changed, 63 insertions(+), 12 deletions(-) diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 930cec03..d149aef5 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -46,6 +46,7 @@ type measureService struct { ingestionAccessLog accesslog.Log pipeline queue.Client broadcaster queue.Client + writeTimeout time.Duration } func (ms *measureService) setLogger(log *logger.Logger) { @@ -67,7 +68,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er } } ctx := measure.Context() - publisher := ms.pipeline.NewBatchPublisher() + publisher := ms.pipeline.NewBatchPublisher(ms.writeTimeout) defer publisher.Close() for { select { diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index 102bd170..a4d48bb5 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -178,6 +178,8 @@ func (s *server) FlagSet() *run.FlagSet { fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens") fs.BoolVar(&s.enableIngestionAccessLog, "enable-ingestion-access-log", false, "enable ingestion access log") fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access log root path") + fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout", 15*time.Second, "stream write timeout") + fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout", 15*time.Second, "measure write timeout") return fs } diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index 3e530ff6..37635a4b 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -46,6 +46,7 @@ type streamService struct { ingestionAccessLog accesslog.Log pipeline queue.Client broadcaster queue.Client + writeTimeout time.Duration } func (s *streamService) setLogger(log *logger.Logger) { @@ -66,7 +67,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { logger.Err(errResp).Msg("failed to send response") } } - publisher := s.pipeline.NewBatchPublisher() + publisher := s.pipeline.NewBatchPublisher(s.writeTimeout) defer publisher.Close() ctx := stream.Context() for { diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go index 90b2d473..8f657175 100644 --- a/banyand/measure/topn.go +++ b/banyand/measure/topn.go @@ -50,7 +50,8 @@ import ( ) const ( - timeBucketFormat = "200601021504" + timeBucketFormat = "200601021504" + resultPersistencyTimeout = 10 * time.Second ) var ( @@ -133,7 +134,7 @@ func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) err eventTime := t.downSampleTimeBucket(record.TimestampMillis()) timeBucket := eventTime.Format(timeBucketFormat) var err error - publisher := t.pipeline.NewBatchPublisher() + publisher := t.pipeline.NewBatchPublisher(resultPersistencyTimeout) defer publisher.Close() for group, tuples := range tuplesGroups { if e := t.l.Debug(); e.Enabled() { diff --git a/banyand/queue/local.go b/banyand/queue/local.go index 9609ac24..58c67781 100644 --- a/banyand/queue/local.go +++ b/banyand/queue/local.go @@ -19,6 +19,8 @@ package queue import ( + "time" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/run" @@ -75,7 +77,7 @@ func (l local) Name() string { return "local-pipeline" } -func (l local) NewBatchPublisher() BatchPublisher { +func (l local) NewBatchPublisher(_ time.Duration) BatchPublisher { return &localBatchPublisher{ local: l.local, } diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index cbe1cd50..200b2b9d 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -159,10 +159,11 @@ func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, err } // NewBatchPublisher returns a new batch publisher. -func (p *pub) NewBatchPublisher() queue.BatchPublisher { +func (p *pub) NewBatchPublisher(timeout time.Duration) queue.BatchPublisher { return &batchPublisher{ pub: p, streams: make(map[string]writeStream), + timeout: timeout, f: batchFuture{errNodes: make(map[string]struct{}), l: p.log}, } } @@ -196,6 +197,7 @@ type batchPublisher struct { pub *pub streams map[string]writeStream f batchFuture + timeout time.Duration } func (bp *batchPublisher) Close() (err error) { @@ -257,7 +259,7 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus err = multierr.Append(err, fmt.Errorf("failed to get client for node %s", node)) continue } - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), bp.timeout) // this assignment is for getting around the go vet lint deferFn := cancel stream, errCreateStream := client.client.Send(ctx) diff --git a/banyand/queue/pub/pub_suite_test.go b/banyand/queue/pub/pub_suite_test.go index 96bee0ac..9a1c65aa 100644 --- a/banyand/queue/pub/pub_suite_test.go +++ b/banyand/queue/pub/pub_suite_test.go @@ -39,6 +39,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" ) func TestPub(t *testing.T) { @@ -46,6 +47,13 @@ func TestPub(t *testing.T) { ginkgo.RunSpecs(t, "Publish Suite") } +var _ = ginkgo.BeforeSuite(func() { + gomega.Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + })).To(gomega.Succeed()) +}) + type mockServer struct { clusterv1.UnimplementedServiceServer healthServer *health.Server @@ -79,7 +87,7 @@ func (s *mockServer) Send(stream clusterv1.Service_SendServer) error { } if err := stream.Send(res); err != nil { - panic(err) + return err } } } diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go index b68d19cc..bf20bfc0 100644 --- a/banyand/queue/pub/pub_test.go +++ b/banyand/queue/pub/pub_test.go @@ -56,7 +56,7 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { node2 := getDataNode("node2", addr2) p.OnAddOrUpdate(node2) - bp := p.NewBatchPublisher() + bp := p.NewBatchPublisher(3 * time.Second) defer bp.Close() t := bus.UniTopic("test") for i := 0; i < 10; i++ { @@ -68,7 +68,7 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { } }) - ginkgo.FIt("should go to evict queue when node is unavailable", func() { + ginkgo.It("should go to evict queue when node is unavailable", func() { addr1 := getAddress() addr2 := getAddress() closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) @@ -84,7 +84,7 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { node2 := getDataNode("node2", addr2) p.OnAddOrUpdate(node2) - bp := p.NewBatchPublisher() + bp := p.NewBatchPublisher(3 * time.Second) t := bus.UniTopic("test") for i := 0; i < 10; i++ { _, err := bp.Publish(t, @@ -107,5 +107,38 @@ var _ = ginkgo.Describe("publish clients register/unregister", func() { gomega.Expect(p.clients).Should(gomega.HaveKey("node1")) }() }) + + ginkgo.It("should stay in active queue when operation takes a long time", func() { + addr1 := getAddress() + addr2 := getAddress() + closeFn1 := setup(addr1, codes.OK, 0) + closeFn2 := setup(addr2, codes.OK, 5*time.Second) + p := newPub() + defer func() { + p.GracefulStop() + closeFn1() + closeFn2() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + + bp := p.NewBatchPublisher(3 * time.Second) + t := bus.UniTopic("test") + for i := 0; i < 10; i++ { + _, err := bp.Publish(t, + bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", &streamv1.InternalWriteRequest{}), + bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", &streamv1.InternalWriteRequest{}), + ) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + gomega.Expect(bp.Close()).ShouldNot(gomega.HaveOccurred()) + gomega.Consistently(func() int { + p.mu.RLock() + defer p.mu.RUnlock() + return len(p.clients) + }, "1s").Should(gomega.Equal(2)) + }) }) }) diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go index 1f4be29c..aad9141a 100644 --- a/banyand/queue/queue.go +++ b/banyand/queue/queue.go @@ -19,6 +19,7 @@ package queue import ( "io" + "time" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/bus" @@ -41,7 +42,7 @@ type Client interface { run.Unit bus.Publisher bus.Broadcaster - NewBatchPublisher() BatchPublisher + NewBatchPublisher(timeout time.Duration) BatchPublisher Register(schema.EventHandler) }
