This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 6ecbd14f Add failover process between liaison and data nodes (#433) 6ecbd14f is described below commit 6ecbd14f8fdfaeb08119113ee63bfee3a0a3f2b3 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Tue Apr 16 13:20:03 2024 +0800 Add failover process between liaison and data nodes (#433) --- .github/workflows/ci.yml | 1 + banyand/dquery/topn.go | 10 +- banyand/internal/storage/index_test.go | 15 +- banyand/liaison/grpc/measure.go | 3 +- banyand/liaison/grpc/server.go | 2 + banyand/liaison/grpc/stream.go | 3 +- banyand/measure/topn.go | 5 +- banyand/queue/local.go | 6 +- banyand/queue/pub/client.go | 167 +++++++++++++- banyand/queue/pub/client_test.go | 105 +++++++++ banyand/queue/pub/pub.go | 162 +++++++++++--- banyand/queue/pub/pub_suite_test.go | 183 ++++++++++++++++ banyand/queue/pub/pub_test.go | 239 +++++++++++++++++++++ banyand/queue/queue.go | 3 +- docs/concept/clustering.md | 14 ++ pkg/bus/bus.go | 3 +- pkg/grpchelper/client.go | 6 +- .../logical/measure/measure_plan_distributed.go | 7 +- .../logical/stream/stream_plan_distributed.go | 7 +- test/stress/istio/istio_suite_test.go | 6 +- 20 files changed, 882 insertions(+), 65 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8663ca24..5a7e943b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -120,6 +120,7 @@ jobs: uses: ./.github/workflows/test.yml with: options: --fail-fast --label-filter \\!slow + timeout-minutes: 30 result: name: Continuous Integration diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go index f5d7cf1f..ad666a45 100644 --- a/banyand/dquery/topn.go +++ b/banyand/dquery/topn.go @@ -18,6 +18,8 @@ package dquery import ( + "time" + "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" @@ -31,6 +33,8 @@ import ( pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" ) +const defaultTopNQueryTimeout = 10 * time.Second + type topNQueryProcessor struct { broadcaster bus.Broadcaster *queryService @@ -52,7 +56,7 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { agg := request.Agg request.Agg = modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED now := bus.MessageID(request.TimeRange.Begin.Nanos) - ff, err := t.broadcaster.Broadcast(data.TopicTopNQuery, bus.NewMessage(now, request)) + ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, data.TopicTopNQuery, bus.NewMessage(now, request)) if err != nil { resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.Metadata.GetName(), err)) return @@ -87,6 +91,10 @@ func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) { } } } + if allErr != nil { + resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.Metadata.GetName(), allErr)) + return + } if tags == nil { resp = bus.NewMessage(now, &measurev1.TopNResponse{}) return diff --git a/banyand/internal/storage/index_test.go b/banyand/internal/storage/index_test.go index 253a64d3..8e59b4e1 100644 --- a/banyand/internal/storage/index_test.go +++ b/banyand/internal/storage/index_test.go @@ -209,16 +209,23 @@ func TestSeriesIndexController(t *testing.T) { require.NoError(t, err) defer sic.Close() require.NoError(t, sic.run(time.Now().Add(-time.Hour*23+10*time.Minute))) - require.NotNil(t, sic.standby) + sic.RLock() + standby := sic.standby + sic.RUnlock() + require.NotNil(t, standby) idxNames := make([]string, 0) walkDir(tmpDir, "idx-", func(suffix string) error { idxNames = append(idxNames, suffix) return nil }) assert.Equal(t, 2, len(idxNames)) - nextTime := sic.standby.startTime + nextTime := standby.startTime require.NoError(t, sic.run(time.Now().Add(time.Hour))) - require.Nil(t, sic.standby) - assert.Equal(t, nextTime, sic.hot.startTime) + sic.RLock() + standby = sic.standby + hot := sic.hot + sic.RUnlock() + require.Nil(t, standby) + assert.Equal(t, nextTime, hot.startTime) }) } diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 930cec03..d149aef5 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -46,6 +46,7 @@ type measureService struct { ingestionAccessLog accesslog.Log pipeline queue.Client broadcaster queue.Client + writeTimeout time.Duration } func (ms *measureService) setLogger(log *logger.Logger) { @@ -67,7 +68,7 @@ func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) er } } ctx := measure.Context() - publisher := ms.pipeline.NewBatchPublisher() + publisher := ms.pipeline.NewBatchPublisher(ms.writeTimeout) defer publisher.Close() for { select { diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index 102bd170..a4d48bb5 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -178,6 +178,8 @@ func (s *server) FlagSet() *run.FlagSet { fs.Uint32Var(&s.port, "grpc-port", 17912, "the port of banyand listens") fs.BoolVar(&s.enableIngestionAccessLog, "enable-ingestion-access-log", false, "enable ingestion access log") fs.StringVar(&s.accessLogRootPath, "access-log-root-path", "", "access log root path") + fs.DurationVar(&s.streamSVC.writeTimeout, "stream-write-timeout", 15*time.Second, "stream write timeout") + fs.DurationVar(&s.measureSVC.writeTimeout, "measure-write-timeout", 15*time.Second, "measure write timeout") return fs } diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index 3e530ff6..37635a4b 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -46,6 +46,7 @@ type streamService struct { ingestionAccessLog accesslog.Log pipeline queue.Client broadcaster queue.Client + writeTimeout time.Duration } func (s *streamService) setLogger(log *logger.Logger) { @@ -66,7 +67,7 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { logger.Err(errResp).Msg("failed to send response") } } - publisher := s.pipeline.NewBatchPublisher() + publisher := s.pipeline.NewBatchPublisher(s.writeTimeout) defer publisher.Close() ctx := stream.Context() for { diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go index 90b2d473..8f657175 100644 --- a/banyand/measure/topn.go +++ b/banyand/measure/topn.go @@ -50,7 +50,8 @@ import ( ) const ( - timeBucketFormat = "200601021504" + timeBucketFormat = "200601021504" + resultPersistencyTimeout = 10 * time.Second ) var ( @@ -133,7 +134,7 @@ func (t *topNStreamingProcessor) writeStreamRecord(record flow.StreamRecord) err eventTime := t.downSampleTimeBucket(record.TimestampMillis()) timeBucket := eventTime.Format(timeBucketFormat) var err error - publisher := t.pipeline.NewBatchPublisher() + publisher := t.pipeline.NewBatchPublisher(resultPersistencyTimeout) defer publisher.Close() for group, tuples := range tuplesGroups { if e := t.l.Debug(); e.Enabled() { diff --git a/banyand/queue/local.go b/banyand/queue/local.go index 9609ac24..75cfb47d 100644 --- a/banyand/queue/local.go +++ b/banyand/queue/local.go @@ -19,6 +19,8 @@ package queue import ( + "time" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/run" @@ -63,7 +65,7 @@ func (l *local) Publish(topic bus.Topic, message ...bus.Message) (bus.Future, er return l.local.Publish(topic, message...) } -func (l *local) Broadcast(topic bus.Topic, message bus.Message) ([]bus.Future, error) { +func (l *local) Broadcast(_ time.Duration, topic bus.Topic, message bus.Message) ([]bus.Future, error) { f, err := l.Publish(topic, message) if err != nil { return nil, err @@ -75,7 +77,7 @@ func (l local) Name() string { return "local-pipeline" } -func (l local) NewBatchPublisher() BatchPublisher { +func (l local) NewBatchPublisher(_ time.Duration) BatchPublisher { return &localBatchPublisher{ local: l.local, } diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go index 72014607..b82dbc67 100644 --- a/banyand/queue/pub/client.go +++ b/banyand/queue/pub/client.go @@ -18,17 +18,29 @@ package pub import ( + "context" "fmt" + "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health/grpc_health_v1" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/logger" ) +const rpcTimeout = 2 * time.Second + var ( + // Retry policy for health check. + initBackoff = time.Second + maxBackoff = 20 * time.Second + backoffMultiplier = 2.0 + serviceName = clusterv1.Service_ServiceDesc.ServiceName // The timeout is set by each RPC. @@ -49,6 +61,7 @@ var ( type client struct { client clusterv1.ServiceClient conn *grpc.ClientConn + md schema.Metadata } func (p *pub) OnAddOrUpdate(md schema.Metadata) { @@ -84,8 +97,13 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) { p.mu.Lock() defer p.mu.Unlock() + p.registered[name] = struct{}{} + // If the client already exists, just return - if _, ok := p.clients[name]; ok { + if _, ok := p.active[name]; ok { + return + } + if _, ok := p.evictable[name]; ok { return } conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy)) @@ -93,11 +111,18 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) { p.log.Error().Err(err).Msg("failed to connect to grpc server") return } + + if !p.checkClient(conn, md) { + p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("node is unhealthy, move it to evict queue") + return + } + c := clusterv1.NewServiceClient(conn) - p.clients[name] = &client{conn: conn, client: c} + p.active[name] = &client{conn: conn, client: c, md: md} if p.handler != nil { p.handler.OnAddOrUpdate(md) } + p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("new node is healthy, add it to active queue") } func (p *pub) OnDelete(md schema.Metadata) { @@ -116,14 +141,142 @@ func (p *pub) OnDelete(md schema.Metadata) { } p.mu.Lock() defer p.mu.Unlock() + delete(p.registered, name) + if en, ok := p.evictable[name]; ok { + close(en.c) + delete(p.evictable, name) + p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("node is removed from evict queue by delete event") + return + } - if client, ok := p.clients[name]; ok { - if client.conn != nil { - client.conn.Close() // Close the client connection - } - delete(p.clients, name) + if client, ok := p.active[name]; ok && !p.healthCheck(node, client.conn) { + _ = client.conn.Close() + delete(p.active, name) if p.handler != nil { p.handler.OnDelete(md) } + p.log.Info().Str("status", p.dump()).Stringer("node", node).Msg("node is removed from active queue by delete event") + } +} + +func (p *pub) checkClient(conn *grpc.ClientConn, md schema.Metadata) bool { + node, ok := md.Spec.(*databasev1.Node) + if !ok { + logger.Panicf("failed to cast node spec") + return false + } + if p.healthCheck(node, conn) { + return true + } + _ = conn.Close() + if !p.closer.AddRunning() { + return false + } + name := node.Metadata.Name + p.evictable[name] = evictNode{n: node, c: make(chan struct{})} + if p.handler != nil { + p.handler.OnDelete(md) + } + go func(p *pub, name string, en evictNode, md schema.Metadata) { + defer p.closer.Done() + backoff := initBackoff + for { + select { + case <-time.After(backoff): + connEvict, errEvict := grpc.Dial(node.GrpcAddress, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy)) + if errEvict == nil && p.healthCheck(en.n, connEvict) { + p.mu.Lock() + defer p.mu.Unlock() + if _, ok := p.evictable[name]; !ok { + // The client has been removed from evict clients map, just return + return + } + c := clusterv1.NewServiceClient(connEvict) + p.active[name] = &client{conn: connEvict, client: c, md: md} + if p.handler != nil { + p.handler.OnAddOrUpdate(md) + } + delete(p.evictable, name) + p.log.Info().Stringer("node", en.n).Msg("node is healthy, move it back to active queue") + return + } + if errEvict != nil { + _ = connEvict.Close() + } + if _, ok := p.registered[name]; !ok { + return + } + p.log.Error().Err(errEvict).Msgf("failed to re-connect to grpc server after waiting for %s", backoff) + case <-en.c: + return + } + if backoff < maxBackoff { + backoff *= time.Duration(backoffMultiplier) + } else { + backoff = maxBackoff + } + } + }(p, name, p.evictable[name], md) + return false +} + +func (p *pub) healthCheck(node fmt.Stringer, conn *grpc.ClientConn) bool { + var resp *grpc_health_v1.HealthCheckResponse + if err := grpchelper.Request(context.Background(), rpcTimeout, func(rpcCtx context.Context) (err error) { + resp, err = grpc_health_v1.NewHealthClient(conn).Check(rpcCtx, + &grpc_health_v1.HealthCheckRequest{ + Service: "", + }) + return err + }); err != nil { + if e := p.log.Debug(); e.Enabled() { + e.Err(err).Stringer("node", node).Msg("service unhealthy") + } + return false + } + if resp.GetStatus() == grpc_health_v1.HealthCheckResponse_SERVING { + return true + } + return false +} + +func (p *pub) failover(node string) { + p.mu.Lock() + defer p.mu.Unlock() + if en, ok := p.evictable[node]; ok { + close(en.c) + delete(p.evictable, node) + p.log.Info().Str("node", node).Str("status", p.dump()).Msg("node is removed from evict queue by wire event") + return + } + + if client, ok := p.active[node]; ok && !p.checkClient(client.conn, client.md) { + _ = client.conn.Close() + delete(p.active, node) + if p.handler != nil { + p.handler.OnDelete(client.md) + } + p.log.Info().Str("status", p.dump()).Str("node", node).Msg("node is unhealthy, move it to evict queue") } } + +func (p *pub) dump() string { + keysRegistered := make([]string, 0, len(p.registered)) + for k := range p.registered { + keysRegistered = append(keysRegistered, k) + } + keysActive := make([]string, 0, len(p.active)) + for k := range p.active { + keysActive = append(keysActive, k) + } + keysEvictable := make([]string, 0, len(p.evictable)) + for k := range p.evictable { + keysEvictable = append(keysEvictable, k) + } + return fmt.Sprintf("registered: %v, active :%v, evictable :%v", keysRegistered, keysActive, keysEvictable) +} + +type evictNode struct { + n *databasev1.Node + c chan struct{} +} diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go new file mode 100644 index 00000000..2ced4841 --- /dev/null +++ b/banyand/queue/pub/client_test.go @@ -0,0 +1,105 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pub + +import ( + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + "google.golang.org/grpc/codes" + + "github.com/apache/skywalking-banyandb/pkg/test/flags" +) + +var _ = ginkgo.Describe("publish clients register/unregister", func() { + var goods []gleak.Goroutine + ginkgo.BeforeEach(func() { + goods = gleak.Goroutines() + }) + ginkgo.AfterEach(func() { + gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + }) + ginkgo.It("should register and unregister clients", func() { + addr1 := getAddress() + closeFn := setup(addr1, codes.OK, 200*time.Millisecond) + p := newPub() + defer func() { + p.GracefulStop() + closeFn() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + verifyClients(p, 1, 0, 1, 0) + addr2 := getAddress() + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + verifyClients(p, 1, 1, 1, 1) + + p.OnDelete(node1) + verifyClients(p, 1, 1, 1, 1) + p.OnDelete(node2) + verifyClients(p, 1, 0, 1, 1) + closeFn() + p.OnDelete(node1) + verifyClients(p, 0, 0, 1, 2) + }) + + ginkgo.It("should move back to active queue", func() { + addr1 := getAddress() + node1 := getDataNode("node1", addr1) + p := newPub() + defer p.GracefulStop() + p.OnAddOrUpdate(node1) + verifyClients(p, 0, 1, 0, 1) + closeFn := setup(addr1, codes.OK, 200*time.Millisecond) + defer closeFn() + gomega.Eventually(func() int { + p.mu.RLock() + defer p.mu.RUnlock() + return len(p.active) + }, flags.EventuallyTimeout).Should(gomega.Equal(1)) + verifyClients(p, 1, 0, 1, 1) + }) + + ginkgo.It("should be removed", func() { + addr1 := getAddress() + node1 := getDataNode("node1", addr1) + p := newPub() + defer p.GracefulStop() + closeFn := setup(addr1, codes.OK, 200*time.Millisecond) + p.OnAddOrUpdate(node1) + verifyClients(p, 1, 0, 1, 0) + closeFn() + p.failover("node1") + verifyClients(p, 0, 1, 1, 2) + p.OnDelete(node1) + verifyClients(p, 0, 0, 1, 2) + }) +}) + +func verifyClients(p *pub, active, evict, onAdd, onDelete int) { + p.mu.RLock() + defer p.mu.RUnlock() + gomega.Expect(p.active).Should(gomega.HaveLen(active)) + gomega.Expect(p.evictable).Should(gomega.HaveLen(evict)) + h := p.handler.(*mockHandler) + gomega.Expect(h.addOrUpdateCount).Should(gomega.Equal(onAdd)) + gomega.Expect(h.deleteCount).Should(gomega.Equal(onDelete)) +} diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index d2576e51..2acaf6ff 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -27,6 +27,8 @@ import ( "github.com/pkg/errors" "go.uber.org/multierr" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -47,12 +49,14 @@ var ( type pub struct { schema.UnimplementedOnInitHandler - metadata metadata.Repo - handler schema.EventHandler - log *logger.Logger - clients map[string]*client - closer *run.Closer - mu sync.RWMutex + metadata metadata.Repo + handler schema.EventHandler + log *logger.Logger + registered map[string]struct{} + active map[string]*client + evictable map[string]evictNode + closer *run.Closer + mu sync.RWMutex } func (p *pub) Register(handler schema.EventHandler) { @@ -60,13 +64,18 @@ func (p *pub) Register(handler schema.EventHandler) { } func (p *pub) GracefulStop() { - p.closer.Done() - p.closer.CloseThenWait() p.mu.Lock() defer p.mu.Unlock() - for _, c := range p.clients { + for i := range p.evictable { + close(p.evictable[i].c) + } + p.evictable = nil + p.closer.Done() + p.closer.CloseThenWait() + for _, c := range p.active { _ = c.conn.Close() } + p.active = nil } // Serve implements run.Service. @@ -74,10 +83,10 @@ func (p *pub) Serve() run.StopNotify { return p.closer.CloseNotify() } -func (p *pub) Broadcast(topic bus.Topic, messages bus.Message) ([]bus.Future, error) { +func (p *pub) Broadcast(timeout time.Duration, topic bus.Topic, messages bus.Message) ([]bus.Future, error) { var names []string p.mu.RLock() - for k := range p.clients { + for k := range p.active { names = append(names, k) } p.mu.RUnlock() @@ -85,10 +94,9 @@ func (p *pub) Broadcast(topic bus.Topic, messages bus.Message) ([]bus.Future, er var wg sync.WaitGroup for _, n := range names { wg.Add(1) - // Send a value into sem. If sem is full, this will block until there's room. go func(n string) { defer wg.Done() - f, err := p.Publish(topic, bus.NewMessageWithNode(messages.ID(), n, messages.Data())) + f, err := p.publish(timeout, topic, bus.NewMessageWithNode(messages.ID(), n, messages.Data())) futureCh <- publishResult{n: n, f: f, e: err} }(n) } @@ -101,6 +109,14 @@ func (p *pub) Broadcast(topic bus.Topic, messages bus.Message) ([]bus.Future, er for f := range futureCh { if f.e != nil { errs = multierr.Append(errs, errors.Wrapf(f.e, "failed to publish message to %s", f.n)) + if isFailoverError(f.e) { + if p.closer.AddRunning() { + go func() { + defer p.closer.Done() + p.failover(f.n) + }() + } + } continue } futures = append(futures, f.f) @@ -118,22 +134,22 @@ type publishResult struct { n string } -func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, error) { +func (p *pub) publish(timeout time.Duration, topic bus.Topic, messages ...bus.Message) (bus.Future, error) { var err error f := &future{} handleMessage := func(m bus.Message, err error) error { r, errSend := messageToRequest(topic, m) if errSend != nil { - return multierr.Append(err, fmt.Errorf("failed to marshal message %T: %w", m, errSend)) + return multierr.Append(err, fmt.Errorf("failed to marshal message[%d]: %w", m.ID(), errSend)) } node := m.Node() p.mu.RLock() - client, ok := p.clients[node] + client, ok := p.active[node] p.mu.RUnlock() if !ok { return multierr.Append(err, fmt.Errorf("failed to get client for node %s", node)) } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), timeout) f.cancelFn = append(f.cancelFn, cancel) stream, errCreateStream := client.client.Send(ctx) if errCreateStream != nil { @@ -153,17 +169,28 @@ func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, err return f, err } +func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, error) { + return p.publish(5*time.Second, topic, messages...) +} + // NewBatchPublisher returns a new batch publisher. -func (p *pub) NewBatchPublisher() queue.BatchPublisher { - return &batchPublisher{pub: p, streams: make(map[string]writeStream)} +func (p *pub) NewBatchPublisher(timeout time.Duration) queue.BatchPublisher { + return &batchPublisher{ + pub: p, + streams: make(map[string]writeStream), + timeout: timeout, + f: batchFuture{errNodes: make(map[string]struct{}), l: p.log}, + } } // New returns a new queue client. func New(metadata metadata.Repo) queue.Client { return &pub{ - metadata: metadata, - clients: make(map[string]*client), - closer: run.NewCloser(1), + metadata: metadata, + active: make(map[string]*client), + evictable: make(map[string]evictNode), + registered: make(map[string]struct{}), + closer: run.NewCloser(1), } } @@ -185,6 +212,8 @@ type writeStream struct { type batchPublisher struct { pub *pub streams map[string]writeStream + f batchFuture + timeout time.Duration } func (bp *batchPublisher) Close() (err error) { @@ -194,14 +223,23 @@ func (bp *batchPublisher) Close() (err error) { for i := range bp.streams { <-bp.streams[i].ctxDoneCh } + if bp.pub.closer.AddRunning() { + go func(f *batchFuture) { + defer bp.pub.closer.Done() + for _, n := range f.get() { + bp.pub.failover(n) + } + }(&bp.f) + } return err } func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, error) { + var err error for _, m := range messages { - r, err := messageToRequest(topic, m) - if err != nil { - err = multierr.Append(err, fmt.Errorf("failed to marshal message %T: %w", m, err)) + r, errM2R := messageToRequest(topic, m) + if errM2R != nil { + err = multierr.Append(err, fmt.Errorf("failed to marshal message %T: %w", m, errM2R)) continue } node := m.Node() @@ -231,17 +269,17 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus } bp.pub.mu.RLock() - client, ok := bp.pub.clients[node] + client, ok := bp.pub.active[node] bp.pub.mu.RUnlock() if !ok { err = multierr.Append(err, fmt.Errorf("failed to get client for node %s", node)) continue } - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), bp.timeout) // this assignment is for getting around the go vet lint deferFn := cancel stream, errCreateStream := client.client.Send(ctx) - if err != nil { + if errCreateStream != nil { err = multierr.Append(err, fmt.Errorf("failed to get stream for node %s: %w", node, errCreateStream)) continue } @@ -249,9 +287,13 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus client: stream, ctxDoneCh: ctx.Done(), } + bp.f.events = append(bp.f.events, make(chan batchEvent)) _ = sendData() - go func(s clusterv1.Service_SendClient, deferFn func()) { - defer deferFn() + go func(s clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent) { + defer func() { + close(bc) + deferFn() + }() for { _, errRecv := s.Recv() if errRecv == nil { @@ -260,11 +302,12 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus if errors.Is(errRecv, io.EOF) { return } - bp.pub.log.Err(errRecv).Msg("failed to receive message") + bc <- batchEvent{n: node, e: errRecv} + return } - }(stream, deferFn) + }(stream, deferFn, bp.f.events[len(bp.f.events)-1]) } - return nil, nil + return nil, err } func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, error) { @@ -275,7 +318,7 @@ func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, e } message, ok := m.Data().(proto.Message) if !ok { - return nil, fmt.Errorf("invalid message type %T", m) + return nil, fmt.Errorf("invalid message type %T", m.Data()) } anyMessage, err := anypb.New(message) if err != nil { @@ -342,3 +385,54 @@ func (l *future) GetAll() ([]bus.Message, error) { ret = append(ret, m) } } + +type batchEvent struct { + e error + n string +} + +type batchFuture struct { + errNodes map[string]struct{} + l *logger.Logger + events []chan batchEvent +} + +func (b *batchFuture) get() []string { + var wg sync.WaitGroup + var mux sync.Mutex + wg.Add(len(b.events)) + for _, e := range b.events { + go func(e chan batchEvent, mux *sync.Mutex) { + defer wg.Done() + for evt := range e { + func() { + mux.Lock() + defer mux.Unlock() + // only log the error once for each node + if _, ok := b.errNodes[evt.n]; !ok && isFailoverError(evt.e) { + b.l.Error().Err(evt.e).Msgf("failed to send message to node %s", evt.n) + b.errNodes[evt.n] = struct{}{} + return + } + b.l.Error().Err(evt.e).Msgf("failed to send message to node %s", evt.n) + }() + } + }(e, &mux) + } + wg.Wait() + mux.Lock() + defer mux.Unlock() + var result []string + for n := range b.errNodes { + result = append(result, n) + } + return result +} + +func isFailoverError(err error) bool { + s, ok := status.FromError(err) + if !ok { + return false + } + return s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded +} diff --git a/banyand/queue/pub/pub_suite_test.go b/banyand/queue/pub/pub_suite_test.go new file mode 100644 index 00000000..79080d9a --- /dev/null +++ b/banyand/queue/pub/pub_suite_test.go @@ -0,0 +1,183 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pub + +import ( + "io" + "net" + "strconv" + "testing" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/apache/skywalking-banyandb/api/data" + clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" +) + +func TestPub(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, "Publish Suite") +} + +var _ = ginkgo.BeforeSuite(func() { + gomega.Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + })).To(gomega.Succeed()) +}) + +type mockServer struct { + clusterv1.UnimplementedServiceServer + healthServer *health.Server + latency time.Duration + code codes.Code +} + +func (s *mockServer) Send(stream clusterv1.Service_SendServer) error { + for { + req, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + + if s.code != codes.OK { + s.healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + return status.Error(s.code, "mock error") + } + + if s.latency > 0 { + time.Sleep(s.latency) + } + + topic, ok := data.TopicMap[req.Topic] + + if !ok { + panic("invalid topic") + } + f := data.TopicResponseMap[topic] + var body *anypb.Any + if f == nil { + body = req.Body + } else { + var errAny error + body, errAny = anypb.New(f()) + if errAny != nil { + panic(errAny) + } + } + + res := &clusterv1.SendResponse{ + MessageId: req.MessageId, + Error: "", + Body: body, + } + + if err := stream.Send(res); err != nil { + return err + } + } +} + +func setup(address string, code codes.Code, latency time.Duration) func() { + s := grpc.NewServer() + hs := health.NewServer() + clusterv1.RegisterServiceServer(s, &mockServer{ + code: code, + latency: latency, + healthServer: hs, + }) + grpc_health_v1.RegisterHealthServer(s, hs) + go func() { + lis, err := net.Listen("tcp", address) + if err != nil { + logger.Panicf("failed to listen: %v", err) + return + } + if err := s.Serve(lis); err != nil { + logger.Panicf("Server exited with error: %v", err) + } + }() + return s.GracefulStop +} + +func getAddress() string { + ports, err := test.AllocateFreePorts(1) + if err != nil { + logger.Panicf("failed to allocate free ports: %v", err) + return "" + } + return net.JoinHostPort("localhost", strconv.Itoa(ports[0])) +} + +type mockHandler struct { + addOrUpdateCount int + deleteCount int +} + +func (m *mockHandler) OnInit(_ []schema.Kind) (bool, []int64) { + panic("no implemented") +} + +func (m *mockHandler) OnAddOrUpdate(_ schema.Metadata) { + m.addOrUpdateCount++ +} + +func (m *mockHandler) OnDelete(_ schema.Metadata) { + m.deleteCount++ +} + +func newPub() *pub { + p := New(nil).(*pub) + p.log = logger.GetLogger("pub") + p.handler = &mockHandler{} + return p +} + +func getDataNode(name string, address string) schema.Metadata { + return schema.Metadata{ + TypeMeta: schema.TypeMeta{ + Name: name, + Kind: schema.KindNode, + }, + Spec: &databasev1.Node{ + Metadata: &commonv1.Metadata{ + Name: name, + }, + Roles: []databasev1.Role{databasev1.Role_ROLE_DATA}, + GrpcAddress: address, + }, + } +} diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go new file mode 100644 index 00000000..0f1bb250 --- /dev/null +++ b/banyand/queue/pub/pub_test.go @@ -0,0 +1,239 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pub + +import ( + "io" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + "github.com/pkg/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/apache/skywalking-banyandb/api/data" + streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/test/flags" +) + +var _ = ginkgo.Describe("Publish and Broadcast", func() { + var goods []gleak.Goroutine + ginkgo.BeforeEach(func() { + goods = gleak.Goroutines() + }) + ginkgo.AfterEach(func() { + gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + }) + + ginkgo.Context("Publisher and batch publisher", func() { + ginkgo.It("should publish messages", func() { + addr1 := getAddress() + addr2 := getAddress() + closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) + closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond) + p := newPub() + defer func() { + p.GracefulStop() + closeFn1() + closeFn2() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + + bp := p.NewBatchPublisher(3 * time.Second) + defer bp.Close() + for i := 0; i < 10; i++ { + _, err := bp.Publish(data.TopicStreamWrite, + bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", &streamv1.InternalWriteRequest{}), + bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", &streamv1.InternalWriteRequest{}), + ) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + }) + + ginkgo.It("should go to evict queue when node is unavailable", func() { + addr1 := getAddress() + addr2 := getAddress() + closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) + closeFn2 := setup(addr2, codes.Unavailable, 0) + p := newPub() + defer func() { + p.GracefulStop() + closeFn1() + closeFn2() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + + bp := p.NewBatchPublisher(3 * time.Second) + for i := 0; i < 10; i++ { + _, err := bp.Publish(data.TopicStreamWrite, + bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", &streamv1.InternalWriteRequest{}), + bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", &streamv1.InternalWriteRequest{}), + ) + if err != nil { + // The mock server will return io.EOF when the node is unavailable + // It will close the stream and return io.EOF to the send + gomega.Expect(errors.Is(err, io.EOF)) + } + } + gomega.Expect(bp.Close()).ShouldNot(gomega.HaveOccurred()) + gomega.Eventually(func() int { + p.mu.RLock() + defer p.mu.RUnlock() + return len(p.active) + }, flags.EventuallyTimeout).Should(gomega.Equal(1)) + func() { + p.mu.RLock() + defer p.mu.RUnlock() + gomega.Expect(p.evictable).Should(gomega.HaveLen(1)) + gomega.Expect(p.evictable).Should(gomega.HaveKey("node2")) + gomega.Expect(p.active).Should(gomega.HaveKey("node1")) + }() + }) + + ginkgo.It("should stay in active queue when operation takes a long time", func() { + addr1 := getAddress() + addr2 := getAddress() + closeFn1 := setup(addr1, codes.OK, 0) + closeFn2 := setup(addr2, codes.OK, 5*time.Second) + p := newPub() + defer func() { + p.GracefulStop() + closeFn1() + closeFn2() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + + bp := p.NewBatchPublisher(3 * time.Second) + for i := 0; i < 10; i++ { + _, err := bp.Publish(data.TopicStreamWrite, + bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", &streamv1.InternalWriteRequest{}), + bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", &streamv1.InternalWriteRequest{}), + ) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + gomega.Expect(bp.Close()).ShouldNot(gomega.HaveOccurred()) + gomega.Consistently(func() int { + p.mu.RLock() + defer p.mu.RUnlock() + return len(p.active) + }, "1s").Should(gomega.Equal(2)) + }) + }) + + ginkgo.Context("Broadcast", func() { + ginkgo.It("should broadcast messages", func() { + addr1 := getAddress() + addr2 := getAddress() + closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) + closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond) + p := newPub() + defer func() { + p.GracefulStop() + closeFn1() + closeFn2() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + + ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), &streamv1.QueryRequest{})) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(ff).Should(gomega.HaveLen(2)) + messages, err := ff[0].GetAll() + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(messages).Should(gomega.HaveLen(1)) + }) + + ginkgo.It("should broadcast messages to failed nodes", func() { + addr1 := getAddress() + addr2 := getAddress() + closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) + closeFn2 := setup(addr2, codes.Unavailable, 0) + p := newPub() + defer func() { + p.GracefulStop() + closeFn1() + closeFn2() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + + ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), &streamv1.QueryRequest{})) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(ff).Should(gomega.HaveLen(2)) + for i := range ff { + _, err := ff[i].GetAll() + ginkgo.GinkgoWriter.Printf("error: %v \n", err) + if err != nil { + s, ok := status.FromError(err) + gomega.Expect(ok).Should(gomega.BeTrue()) + gomega.Expect(s.Code()).Should(gomega.Equal(codes.Unavailable)) + return + } + } + ginkgo.Fail("should not reach here") + }) + + ginkgo.It("should broadcast messages to slow nodes", func() { + addr1 := getAddress() + addr2 := getAddress() + closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) + closeFn2 := setup(addr2, codes.OK, 5*time.Second) + p := newPub() + defer func() { + p.GracefulStop() + closeFn1() + closeFn2() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + + ff, err := p.Broadcast(3*time.Second, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(1), &streamv1.QueryRequest{})) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(ff).Should(gomega.HaveLen(2)) + for i := range ff { + _, err := ff[i].GetAll() + ginkgo.GinkgoWriter.Printf("error: %v \n", err) + if err != nil { + s, ok := status.FromError(err) + gomega.Expect(ok).Should(gomega.BeTrue()) + gomega.Expect(s.Code()).Should(gomega.Equal(codes.DeadlineExceeded)) + return + } + } + ginkgo.Fail("should not reach here") + }) + }) +}) diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go index 1f4be29c..aad9141a 100644 --- a/banyand/queue/queue.go +++ b/banyand/queue/queue.go @@ -19,6 +19,7 @@ package queue import ( "io" + "time" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/bus" @@ -41,7 +42,7 @@ type Client interface { run.Unit bus.Publisher bus.Broadcaster - NewBatchPublisher() BatchPublisher + NewBatchPublisher(timeout time.Duration) BatchPublisher Register(schema.EventHandler) } diff --git a/docs/concept/clustering.md b/docs/concept/clustering.md index 624bc828..3f49cda1 100644 --- a/docs/concept/clustering.md +++ b/docs/concept/clustering.md @@ -45,6 +45,8 @@ All nodes within a BanyanDB cluster communicate with other nodes according to th All nodes in the cluster are discovered by the Meta Nodes. When a node starts up, it registers itself with the Meta Nodes. The Meta Nodes then share this information with the Liaison Nodes which use it to route requests to the appropriate nodes. +If data nodes are unable to connect to the meta nodes due to network partition or other issues, they will be removed from the meta nodes. However, the liaison nodes will not remove the data nodes from their routing list until the data nodes are also unreachable from the liaison nodes' perspective. This approach ensures that the system can continue to function even if some data nodes are temporarily unavailable from the meta nodes. + ## 3. **Data Organization** Different nodes in BanyanDB are responsible for different parts of the database, while Query and Liaison Nodes manage the routing and processing of queries. @@ -177,3 +179,15 @@ User 4. The results from each shard are then returned to the Liaison Node, which consolidates them into a single response to the user. This architecture allows BanyanDB to execute queries efficiently across a distributed system, leveraging the distributed query capabilities of the Liaison Node and the parallel processing of Data Nodes. + +## 7. Failover + +BanyanDB is designed to be highly available and fault-tolerant. + +In case of a Data Node failure, the system can automatically recover and continue to operate. + +Liaison nodes have a built-in mechanism to detect the failure of a Data Node. When a Data Node fails, the Liaison Node will automatically route requests to other available Data Nodes with the same shard. This ensures that the system remains operational even in the face of node failures. Thanks to the query mode, which allows Liaison Nodes to access all Data Nodes, the system can continue to function even if some Data Nodes are unavailable. When the failed data nodes are restored, the sys [...] + +In the case of a Liaison Node failure, the system can be configured to have multiple Liaison Nodes for redundancy. If one Liaison Node fails, the other Liaison Nodes can take over its responsibilities, ensuring that the system remains available. + +> Please note that any written request which triggers the failover process will be rejected, and the client should re-send the request. diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 26c5be17..425bc60f 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -23,6 +23,7 @@ import ( "errors" "io" "sync" + "time" "go.uber.org/multierr" @@ -102,7 +103,7 @@ type Publisher interface { // Broadcaster allow sending Messages to a Topic and receiving the responses. type Broadcaster interface { - Broadcast(topic Topic, message Message) ([]Future, error) + Broadcast(timeout time.Duration, topic Topic, message Message) ([]Future, error) } type channel chan event diff --git a/pkg/grpchelper/client.go b/pkg/grpchelper/client.go index 4e578774..3f815022 100644 --- a/pkg/grpchelper/client.go +++ b/pkg/grpchelper/client.go @@ -70,11 +70,11 @@ func Request(ctx context.Context, rpcTimeout time.Duration, fn func(rpcCtx conte err := fn(rpcCtx) if err != nil { if stat, ok := status.FromError(err); ok && stat.Code() == codes.Unimplemented { - l.Warn().Str("stat", stat.Message()).Msg("error: this server does not implement the service") + l.Info().Str("stat", stat.Message()).Msg("error: this server does not implement the service") } else if stat, ok := status.FromError(err); ok && stat.Code() == codes.DeadlineExceeded { - l.Warn().Dur("rpcTimeout", rpcTimeout).Msg("timeout: rpc did not complete within") + l.Info().Dur("rpcTimeout", rpcTimeout).Msg("timeout: rpc did not complete within") } else { - l.Error().Err(err).Msg("error: rpc failed:") + l.Info().Err(err).Msg("error: rpc failed:") } return err } diff --git a/pkg/query/logical/measure/measure_plan_distributed.go b/pkg/query/logical/measure/measure_plan_distributed.go index 8f35f4b4..d0aee004 100644 --- a/pkg/query/logical/measure/measure_plan_distributed.go +++ b/pkg/query/logical/measure/measure_plan_distributed.go @@ -20,6 +20,7 @@ package measure import ( "context" "fmt" + "time" "go.uber.org/multierr" "google.golang.org/protobuf/proto" @@ -35,6 +36,8 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/logical" ) +const defaultQueryTimeout = 10 * time.Second + var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil) type unresolvedDistributed struct { @@ -138,11 +141,11 @@ func (t *distributedPlan) Execute(ctx context.Context) (executor.MIterator, erro if t.maxDataPointsSize > 0 { query.Limit = t.maxDataPointsSize } - ff, err := dctx.Broadcast(data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) + var allErr error + ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) if err != nil { return nil, err } - var allErr error var see []sort.Iterator[*comparableDataPoint] for _, f := range ff { if m, getErr := f.Get(); getErr != nil { diff --git a/pkg/query/logical/stream/stream_plan_distributed.go b/pkg/query/logical/stream/stream_plan_distributed.go index 6f524737..8c3ed930 100644 --- a/pkg/query/logical/stream/stream_plan_distributed.go +++ b/pkg/query/logical/stream/stream_plan_distributed.go @@ -20,6 +20,7 @@ package stream import ( "context" "fmt" + "time" "go.uber.org/multierr" "google.golang.org/protobuf/proto" @@ -35,6 +36,8 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/logical" ) +const defaultQueryTimeout = 10 * time.Second + var _ logical.UnresolvedPlan = (*unresolvedDistributed)(nil) type unresolvedDistributed struct { @@ -128,7 +131,7 @@ func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, err if t.maxElementSize > 0 { query.Limit = t.maxElementSize } - ff, err := dctx.Broadcast(data.TopicStreamQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) + ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery, bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query)) if err != nil { return nil, err } @@ -152,7 +155,7 @@ func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, err for iter.Next() { result = append(result, iter.Val().Element) } - return result, nil + return result, allErr } func (t *distributedPlan) String() string { diff --git a/test/stress/istio/istio_suite_test.go b/test/stress/istio/istio_suite_test.go index 5377b72e..a1945da0 100644 --- a/test/stress/istio/istio_suite_test.go +++ b/test/stress/istio/istio_suite_test.go @@ -66,10 +66,6 @@ var _ = g.Describe("Istio", func() { path, deferFn, err := test.NewSpace() gomega.Expect(err).NotTo(gomega.HaveOccurred()) measurePath := filepath.Join(path, "measure") - g.DeferCleanup(func() { - printDiskUsage(measurePath, 5, 0) - deferFn() - }) var ports []int ports, err = test.AllocateFreePorts(4) gomega.Expect(err).NotTo(gomega.HaveOccurred()) @@ -80,6 +76,8 @@ var _ = g.Describe("Istio", func() { g.DeferCleanup(func() { time.Sleep(time.Minute) closerServerFunc() + printDiskUsage(measurePath, 5, 0) + deferFn() }) gomega.Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())), flags.EventuallyTimeout).Should(gomega.Succeed())