This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch publish in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 257e06586302fc7022077dfac897657d759a940b Author: Gao Hongtao <[email protected]> AuthorDate: Mon Apr 15 03:26:09 2024 +0000 Add test cases Signed-off-by: Gao Hongtao <[email protected]> --- banyand/queue/pub/client.go | 140 ++++++++++++++++++++++++++++-- banyand/queue/pub/client_test.go | 90 ++++++++++++++++++++ banyand/queue/pub/pub.go | 112 +++++++++++++++++++----- banyand/queue/pub/pub_suite_test.go | 156 ++++++++++++++++++++++++++++++++++ banyand/queue/pub/pub_test.go | 111 ++++++++++++++++++++++++ pkg/grpchelper/client.go | 6 +- test/stress/istio/istio_suite_test.go | 6 +- 7 files changed, 586 insertions(+), 35 deletions(-) diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go index 72014607..7bdc77da 100644 --- a/banyand/queue/pub/client.go +++ b/banyand/queue/pub/client.go @@ -18,17 +18,29 @@ package pub import ( + "context" "fmt" + "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health/grpc_health_v1" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/logger" ) +const rpcTimeout = 2 * time.Second + var ( + // Retry policy for health check. + initBackoff = time.Second + maxBackoff = 20 * time.Second + backoffMultiplier = 2.0 + serviceName = clusterv1.Service_ServiceDesc.ServiceName // The timeout is set by each RPC. @@ -49,6 +61,7 @@ var ( type client struct { client clusterv1.ServiceClient conn *grpc.ClientConn + md schema.Metadata } func (p *pub) OnAddOrUpdate(md schema.Metadata) { @@ -88,16 +101,25 @@ func (p *pub) OnAddOrUpdate(md schema.Metadata) { if _, ok := p.clients[name]; ok { return } + if _, ok := p.evictClients[name]; ok { + return + } conn, err := grpc.Dial(address, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy)) if err != nil { p.log.Error().Err(err).Msg("failed to connect to grpc server") return } + + if !p.checkClient(conn, md) { + return + } + c := clusterv1.NewServiceClient(conn) - p.clients[name] = &client{conn: conn, client: c} + p.clients[name] = &client{conn: conn, client: c, md: md} if p.handler != nil { p.handler.OnAddOrUpdate(md) } + p.log.Info().Stringer("node", node).Msg("new node is healthy, add it to active queue") } func (p *pub) OnDelete(md schema.Metadata) { @@ -116,14 +138,122 @@ func (p *pub) OnDelete(md schema.Metadata) { } p.mu.Lock() defer p.mu.Unlock() + if en, ok := p.evictClients[name]; ok { + close(en.c) + delete(p.evictClients, name) + p.log.Info().Stringer("node", node).Msg("node is removed from evict queue by delete event") + return + } - if client, ok := p.clients[name]; ok { - if client.conn != nil { - client.conn.Close() // Close the client connection - } + if client, ok := p.clients[name]; ok && !p.healthCheck(node, client.conn) { + _ = client.conn.Close() delete(p.clients, name) if p.handler != nil { p.handler.OnDelete(md) } + p.log.Info().Stringer("node", node).Msg("node is removed from active queue by delete event") + } +} + +func (p *pub) checkClient(conn *grpc.ClientConn, md schema.Metadata) bool { + node, ok := md.Spec.(*databasev1.Node) + if !ok { + logger.Panicf("failed to cast node spec") + return false + } + if p.healthCheck(node, conn) { + return true + } + _ = conn.Close() + if !p.closer.AddRunning() { + return false + } + p.log.Info().Stringer("node", node).Msg("node is unhealthy, move it to evict queue") + name := node.Metadata.Name + p.evictClients[name] = evictNode{n: node, c: make(chan struct{})} + if p.handler != nil { + p.handler.OnDelete(md) + } + go func(p *pub, name string, en evictNode, md schema.Metadata) { + defer p.closer.Done() + backoff := initBackoff + for { + select { + case <-time.After(backoff): + connEvict, errEvict := grpc.Dial(node.GrpcAddress, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy)) + if errEvict == nil && p.healthCheck(en.n, connEvict) { + p.mu.Lock() + defer p.mu.Unlock() + if _, ok := p.evictClients[name]; !ok { + // The client has been removed from evict clients map, just return + return + } + c := clusterv1.NewServiceClient(connEvict) + p.clients[name] = &client{conn: connEvict, client: c, md: md} + if p.handler != nil { + p.handler.OnAddOrUpdate(md) + } + delete(p.evictClients, name) + p.log.Info().Stringer("node", en.n).Msg("node is healthy, move it back to active queue") + return + } + if errEvict != nil { + _ = connEvict.Close() + } + p.log.Error().Err(errEvict).Msgf("failed to re-connect to grpc server after waiting for %s", backoff) + case <-en.c: + return + } + if backoff < maxBackoff { + backoff *= time.Duration(backoffMultiplier) + } else { + backoff = maxBackoff + } + } + }(p, name, p.evictClients[name], md) + return false +} + +func (p *pub) healthCheck(node fmt.Stringer, conn *grpc.ClientConn) bool { + var resp *grpc_health_v1.HealthCheckResponse + if err := grpchelper.Request(context.Background(), rpcTimeout, func(rpcCtx context.Context) (err error) { + resp, err = grpc_health_v1.NewHealthClient(conn).Check(rpcCtx, + &grpc_health_v1.HealthCheckRequest{ + Service: "", + }) + return err + }); err != nil { + if e := p.log.Debug(); e.Enabled() { + e.Err(err).Stringer("node", node).Msg("service unhealthy") + } + return false + } + if resp.GetStatus() == grpc_health_v1.HealthCheckResponse_SERVING { + return true } + return false +} + +func (p *pub) failover(node string) { + p.mu.Lock() + defer p.mu.Unlock() + if en, ok := p.evictClients[node]; ok { + close(en.c) + delete(p.evictClients, node) + p.log.Info().Str("node", node).Msg("node is removed from evict queue by wire event") + return + } + + if client, ok := p.clients[node]; ok && !p.checkClient(client.conn, client.md) { + _ = client.conn.Close() + delete(p.clients, node) + if p.handler != nil { + p.handler.OnDelete(client.md) + } + } +} + +type evictNode struct { + n *databasev1.Node + c chan struct{} } diff --git a/banyand/queue/pub/client_test.go b/banyand/queue/pub/client_test.go new file mode 100644 index 00000000..83200aa9 --- /dev/null +++ b/banyand/queue/pub/client_test.go @@ -0,0 +1,90 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pub + +import ( + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + "google.golang.org/grpc/codes" + + "github.com/apache/skywalking-banyandb/pkg/test/flags" +) + +var _ = ginkgo.Describe("publish clients register/unregister", func() { + var goods []gleak.Goroutine + ginkgo.BeforeEach(func() { + goods = gleak.Goroutines() + }) + ginkgo.AfterEach(func() { + gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + }) + ginkgo.It("should register and unregister clients", func() { + addr1 := getAddress() + closeFn := setup(addr1, codes.OK, 200*time.Millisecond) + p := newPub() + defer func() { + p.GracefulStop() + closeFn() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + verifyClients(p, 1, 0, 1, 0) + addr2 := getAddress() + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + verifyClients(p, 1, 1, 1, 1) + + p.OnDelete(node1) + verifyClients(p, 1, 1, 1, 1) + p.OnDelete(node2) + verifyClients(p, 1, 0, 1, 1) + closeFn() + p.OnDelete(node1) + verifyClients(p, 0, 0, 1, 2) + }) + + ginkgo.It("should move back to active queue", func() { + addr1 := getAddress() + node1 := getDataNode("node1", addr1) + p := newPub() + defer p.GracefulStop() + p.OnAddOrUpdate(node1) + verifyClients(p, 0, 1, 0, 1) + closeFn := setup(addr1, codes.OK, 200*time.Millisecond) + defer closeFn() + gomega.Eventually(func() int { + p.mu.RLock() + defer p.mu.RUnlock() + return len(p.clients) + }, flags.EventuallyTimeout).Should(gomega.Equal(1)) + verifyClients(p, 1, 0, 1, 1) + }) +}) + +func verifyClients(p *pub, active, evict, onAdd, onDelete int) { + p.mu.RLock() + defer p.mu.RUnlock() + gomega.Expect(p.clients).Should(gomega.HaveLen(active)) + gomega.Expect(p.evictClients).Should(gomega.HaveLen(evict)) + h := p.handler.(*mockHandler) + gomega.Expect(h.addOrUpdateCount).Should(gomega.Equal(onAdd)) + gomega.Expect(h.deleteCount).Should(gomega.Equal(onDelete)) +} diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index d2576e51..cbe1cd50 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -47,12 +47,13 @@ var ( type pub struct { schema.UnimplementedOnInitHandler - metadata metadata.Repo - handler schema.EventHandler - log *logger.Logger - clients map[string]*client - closer *run.Closer - mu sync.RWMutex + metadata metadata.Repo + handler schema.EventHandler + log *logger.Logger + clients map[string]*client + evictClients map[string]evictNode + closer *run.Closer + mu sync.RWMutex } func (p *pub) Register(handler schema.EventHandler) { @@ -60,13 +61,18 @@ func (p *pub) Register(handler schema.EventHandler) { } func (p *pub) GracefulStop() { - p.closer.Done() - p.closer.CloseThenWait() p.mu.Lock() defer p.mu.Unlock() + for i := range p.evictClients { + close(p.evictClients[i].c) + } + p.evictClients = nil + p.closer.Done() + p.closer.CloseThenWait() for _, c := range p.clients { _ = c.conn.Close() } + p.clients = nil } // Serve implements run.Service. @@ -85,7 +91,6 @@ func (p *pub) Broadcast(topic bus.Topic, messages bus.Message) ([]bus.Future, er var wg sync.WaitGroup for _, n := range names { wg.Add(1) - // Send a value into sem. If sem is full, this will block until there's room. go func(n string) { defer wg.Done() f, err := p.Publish(topic, bus.NewMessageWithNode(messages.ID(), n, messages.Data())) @@ -124,7 +129,7 @@ func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, err handleMessage := func(m bus.Message, err error) error { r, errSend := messageToRequest(topic, m) if errSend != nil { - return multierr.Append(err, fmt.Errorf("failed to marshal message %T: %w", m, errSend)) + return multierr.Append(err, fmt.Errorf("failed to marshal message[%d]: %w", m.ID(), errSend)) } node := m.Node() p.mu.RLock() @@ -155,15 +160,20 @@ func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, err // NewBatchPublisher returns a new batch publisher. func (p *pub) NewBatchPublisher() queue.BatchPublisher { - return &batchPublisher{pub: p, streams: make(map[string]writeStream)} + return &batchPublisher{ + pub: p, + streams: make(map[string]writeStream), + f: batchFuture{errNodes: make(map[string]struct{}), l: p.log}, + } } // New returns a new queue client. func New(metadata metadata.Repo) queue.Client { return &pub{ - metadata: metadata, - clients: make(map[string]*client), - closer: run.NewCloser(1), + metadata: metadata, + clients: make(map[string]*client), + evictClients: make(map[string]evictNode), + closer: run.NewCloser(1), } } @@ -185,6 +195,7 @@ type writeStream struct { type batchPublisher struct { pub *pub streams map[string]writeStream + f batchFuture } func (bp *batchPublisher) Close() (err error) { @@ -194,14 +205,23 @@ func (bp *batchPublisher) Close() (err error) { for i := range bp.streams { <-bp.streams[i].ctxDoneCh } + if bp.pub.closer.AddRunning() { + go func(f *batchFuture) { + defer bp.pub.closer.Done() + for _, n := range f.get() { + bp.pub.failover(n) + } + }(&bp.f) + } return err } func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, error) { + var err error for _, m := range messages { - r, err := messageToRequest(topic, m) - if err != nil { - err = multierr.Append(err, fmt.Errorf("failed to marshal message %T: %w", m, err)) + r, errM2R := messageToRequest(topic, m) + if errM2R != nil { + err = multierr.Append(err, fmt.Errorf("failed to marshal message %T: %w", m, errM2R)) continue } node := m.Node() @@ -249,9 +269,13 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus client: stream, ctxDoneCh: ctx.Done(), } + bp.f.events = append(bp.f.events, make(chan batchEvent)) _ = sendData() - go func(s clusterv1.Service_SendClient, deferFn func()) { - defer deferFn() + go func(s clusterv1.Service_SendClient, deferFn func(), bc chan batchEvent) { + defer func() { + close(bc) + deferFn() + }() for { _, errRecv := s.Recv() if errRecv == nil { @@ -260,11 +284,12 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus if errors.Is(errRecv, io.EOF) { return } - bp.pub.log.Err(errRecv).Msg("failed to receive message") + bc <- batchEvent{n: node, e: errRecv} + return } - }(stream, deferFn) + }(stream, deferFn, bp.f.events[len(bp.f.events)-1]) } - return nil, nil + return nil, err } func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, error) { @@ -275,7 +300,7 @@ func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, e } message, ok := m.Data().(proto.Message) if !ok { - return nil, fmt.Errorf("invalid message type %T", m) + return nil, fmt.Errorf("invalid message type %T", m.Data()) } anyMessage, err := anypb.New(message) if err != nil { @@ -342,3 +367,44 @@ func (l *future) GetAll() ([]bus.Message, error) { ret = append(ret, m) } } + +type batchEvent struct { + e error + n string +} + +type batchFuture struct { + errNodes map[string]struct{} + l *logger.Logger + events []chan batchEvent +} + +func (b *batchFuture) get() []string { + var wg sync.WaitGroup + var mux sync.Mutex + wg.Add(len(b.events)) + for _, e := range b.events { + go func(e chan batchEvent, mux *sync.Mutex) { + defer wg.Done() + for evt := range e { + func() { + mux.Lock() + defer mux.Unlock() + // only log the error once for each node + if _, ok := b.errNodes[evt.n]; !ok { + b.l.Error().Err(evt.e).Msgf("failed to send message to node %s", evt.n) + } + b.errNodes[evt.n] = struct{}{} + }() + } + }(e, &mux) + } + wg.Wait() + mux.Lock() + defer mux.Unlock() + var result []string + for n := range b.errNodes { + result = append(result, n) + } + return result +} diff --git a/banyand/queue/pub/pub_suite_test.go b/banyand/queue/pub/pub_suite_test.go new file mode 100644 index 00000000..96bee0ac --- /dev/null +++ b/banyand/queue/pub/pub_suite_test.go @@ -0,0 +1,156 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pub + +import ( + "io" + "net" + "strconv" + "testing" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" + + clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" +) + +func TestPub(t *testing.T) { + gomega.RegisterFailHandler(ginkgo.Fail) + ginkgo.RunSpecs(t, "Publish Suite") +} + +type mockServer struct { + clusterv1.UnimplementedServiceServer + healthServer *health.Server + latency time.Duration + code codes.Code +} + +func (s *mockServer) Send(stream clusterv1.Service_SendServer) error { + for { + req, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + + if s.code != codes.OK { + s.healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + return status.Error(s.code, "mock error") + } + + if s.latency > 0 { + time.Sleep(s.latency) + } + + res := &clusterv1.SendResponse{ + MessageId: req.MessageId, + Error: "", + Body: req.Body, + } + + if err := stream.Send(res); err != nil { + panic(err) + } + } +} + +func setup(address string, code codes.Code, latency time.Duration) func() { + s := grpc.NewServer() + hs := health.NewServer() + clusterv1.RegisterServiceServer(s, &mockServer{ + code: code, + latency: latency, + healthServer: hs, + }) + grpc_health_v1.RegisterHealthServer(s, hs) + go func() { + lis, err := net.Listen("tcp", address) + if err != nil { + logger.Panicf("failed to listen: %v", err) + return + } + if err := s.Serve(lis); err != nil { + logger.Panicf("Server exited with error: %v", err) + } + }() + return s.GracefulStop +} + +func getAddress() string { + ports, err := test.AllocateFreePorts(1) + if err != nil { + logger.Panicf("failed to allocate free ports: %v", err) + return "" + } + return net.JoinHostPort("localhost", strconv.Itoa(ports[0])) +} + +type mockHandler struct { + addOrUpdateCount int + deleteCount int +} + +func (m *mockHandler) OnInit(_ []schema.Kind) (bool, []int64) { + panic("no implemented") +} + +func (m *mockHandler) OnAddOrUpdate(_ schema.Metadata) { + m.addOrUpdateCount++ +} + +func (m *mockHandler) OnDelete(_ schema.Metadata) { + m.deleteCount++ +} + +func newPub() *pub { + p := New(nil).(*pub) + p.log = logger.GetLogger("pub") + p.handler = &mockHandler{} + return p +} + +func getDataNode(name string, address string) schema.Metadata { + return schema.Metadata{ + TypeMeta: schema.TypeMeta{ + Name: name, + Kind: schema.KindNode, + }, + Spec: &databasev1.Node{ + Metadata: &commonv1.Metadata{ + Name: name, + }, + Roles: []databasev1.Role{databasev1.Role_ROLE_DATA}, + GrpcAddress: address, + }, + } +} diff --git a/banyand/queue/pub/pub_test.go b/banyand/queue/pub/pub_test.go new file mode 100644 index 00000000..b68d19cc --- /dev/null +++ b/banyand/queue/pub/pub_test.go @@ -0,0 +1,111 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pub + +import ( + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + "google.golang.org/grpc/codes" + + streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + "github.com/apache/skywalking-banyandb/pkg/bus" + "github.com/apache/skywalking-banyandb/pkg/test/flags" +) + +var _ = ginkgo.Describe("publish clients register/unregister", func() { + var goods []gleak.Goroutine + ginkgo.BeforeEach(func() { + goods = gleak.Goroutines() + }) + ginkgo.AfterEach(func() { + gomega.Eventually(gleak.Goroutines, flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods)) + }) + + ginkgo.Context("publisher and batch publisher", func() { + ginkgo.It("should publish messages", func() { + addr1 := getAddress() + addr2 := getAddress() + closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) + closeFn2 := setup(addr2, codes.OK, 10*time.Millisecond) + p := newPub() + defer func() { + p.GracefulStop() + closeFn1() + closeFn2() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + + bp := p.NewBatchPublisher() + defer bp.Close() + t := bus.UniTopic("test") + for i := 0; i < 10; i++ { + _, err := bp.Publish(t, + bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", &streamv1.InternalWriteRequest{}), + bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", &streamv1.InternalWriteRequest{}), + ) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + }) + + ginkgo.FIt("should go to evict queue when node is unavailable", func() { + addr1 := getAddress() + addr2 := getAddress() + closeFn1 := setup(addr1, codes.OK, 200*time.Millisecond) + closeFn2 := setup(addr2, codes.Unavailable, 0) + p := newPub() + defer func() { + p.GracefulStop() + closeFn1() + closeFn2() + }() + node1 := getDataNode("node1", addr1) + p.OnAddOrUpdate(node1) + node2 := getDataNode("node2", addr2) + p.OnAddOrUpdate(node2) + + bp := p.NewBatchPublisher() + t := bus.UniTopic("test") + for i := 0; i < 10; i++ { + _, err := bp.Publish(t, + bus.NewBatchMessageWithNode(bus.MessageID(i), "node1", &streamv1.InternalWriteRequest{}), + bus.NewBatchMessageWithNode(bus.MessageID(i), "node2", &streamv1.InternalWriteRequest{}), + ) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + gomega.Expect(bp.Close()).ShouldNot(gomega.HaveOccurred()) + gomega.Eventually(func() int { + p.mu.RLock() + defer p.mu.RUnlock() + return len(p.clients) + }, flags.EventuallyTimeout).Should(gomega.Equal(1)) + func() { + p.mu.RLock() + defer p.mu.RUnlock() + gomega.Expect(p.evictClients).Should(gomega.HaveLen(1)) + gomega.Expect(p.evictClients).Should(gomega.HaveKey("node2")) + gomega.Expect(p.clients).Should(gomega.HaveKey("node1")) + }() + }) + }) +}) diff --git a/pkg/grpchelper/client.go b/pkg/grpchelper/client.go index 4e578774..3f815022 100644 --- a/pkg/grpchelper/client.go +++ b/pkg/grpchelper/client.go @@ -70,11 +70,11 @@ func Request(ctx context.Context, rpcTimeout time.Duration, fn func(rpcCtx conte err := fn(rpcCtx) if err != nil { if stat, ok := status.FromError(err); ok && stat.Code() == codes.Unimplemented { - l.Warn().Str("stat", stat.Message()).Msg("error: this server does not implement the service") + l.Info().Str("stat", stat.Message()).Msg("error: this server does not implement the service") } else if stat, ok := status.FromError(err); ok && stat.Code() == codes.DeadlineExceeded { - l.Warn().Dur("rpcTimeout", rpcTimeout).Msg("timeout: rpc did not complete within") + l.Info().Dur("rpcTimeout", rpcTimeout).Msg("timeout: rpc did not complete within") } else { - l.Error().Err(err).Msg("error: rpc failed:") + l.Info().Err(err).Msg("error: rpc failed:") } return err } diff --git a/test/stress/istio/istio_suite_test.go b/test/stress/istio/istio_suite_test.go index 5377b72e..a1945da0 100644 --- a/test/stress/istio/istio_suite_test.go +++ b/test/stress/istio/istio_suite_test.go @@ -66,10 +66,6 @@ var _ = g.Describe("Istio", func() { path, deferFn, err := test.NewSpace() gomega.Expect(err).NotTo(gomega.HaveOccurred()) measurePath := filepath.Join(path, "measure") - g.DeferCleanup(func() { - printDiskUsage(measurePath, 5, 0) - deferFn() - }) var ports []int ports, err = test.AllocateFreePorts(4) gomega.Expect(err).NotTo(gomega.HaveOccurred()) @@ -80,6 +76,8 @@ var _ = g.Describe("Istio", func() { g.DeferCleanup(func() { time.Sleep(time.Minute) closerServerFunc() + printDiskUsage(measurePath, 5, 0) + deferFn() }) gomega.Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())), flags.EventuallyTimeout).Should(gomega.Succeed())
