This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new a85b51c0 Fix issues found by distributed test (#432)
a85b51c0 is described below

commit a85b51c00dc53fe2cb179b4f280a4fb4848c47f8
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Fri Apr 12 10:38:57 2024 +0800

    Fix issues found by distributed test (#432)
---
 banyand/internal/storage/index.go                  |  2 +-
 banyand/internal/storage/index_test.go             |  6 +-
 banyand/metadata/client.go                         | 10 ++-
 banyand/metadata/schema/error.go                   |  3 +-
 banyand/metadata/schema/etcd.go                    | 26 ++++--
 banyand/metadata/schema/register_test.go           |  2 +-
 banyand/metadata/schema/watcher.go                 | 52 +++++++-----
 banyand/metadata/schema/watcher_test.go            |  4 +-
 banyand/queue/pub/client.go                        | 23 ++++--
 banyand/queue/pub/pub.go                           | 96 ++++++++++++++++------
 .../logical/stream/stream_plan_distributed.go      |  4 -
 pkg/run/run.go                                     |  8 +-
 pkg/schema/init.go                                 |  8 +-
 test/stress/trace/Makefile                         | 10 +--
 ...er-compose.yaml => docker-compose-cluster.yaml} | 77 +++++++----------
 ...ker-compose.yaml => docker-compose-single.yaml} |  0
 test/stress/trace/trace_suite_test.go              | 10 ++-
 17 files changed, 210 insertions(+), 131 deletions(-)

diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index 7bc91fea..af7eaed4 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -365,7 +365,7 @@ func (sic *seriesIndexController[T, O]) run(deadline 
time.Time) (err error) {
        }
 
        liveTime := sic.hot.startTime.Sub(deadline)
-       if liveTime > 0 && liveTime < sic.standbyLiveTime {
+       if liveTime > 0 && liveTime <= sic.standbyLiveTime {
                sic.l.Info().Time("deadline", deadline).Msg("start to create 
standby series index")
                standby, err = sic.newIdx(ctx)
                if err != nil {
diff --git a/banyand/internal/storage/index_test.go 
b/banyand/internal/storage/index_test.go
index a7adc370..253a64d3 100644
--- a/banyand/internal/storage/index_test.go
+++ b/banyand/internal/storage/index_test.go
@@ -208,8 +208,8 @@ func TestSeriesIndexController(t *testing.T) {
                sic, err := newSeriesIndexController(ctx, opts)
                require.NoError(t, err)
                defer sic.Close()
-               require.NoError(t, sic.run(time.Now().Add(-time.Hour*23)))
-               assert.NotNil(t, sic.standby)
+               require.NoError(t, 
sic.run(time.Now().Add(-time.Hour*23+10*time.Minute)))
+               require.NotNil(t, sic.standby)
                idxNames := make([]string, 0)
                walkDir(tmpDir, "idx-", func(suffix string) error {
                        idxNames = append(idxNames, suffix)
@@ -218,7 +218,7 @@ func TestSeriesIndexController(t *testing.T) {
                assert.Equal(t, 2, len(idxNames))
                nextTime := sic.standby.startTime
                require.NoError(t, sic.run(time.Now().Add(time.Hour)))
-               assert.Nil(t, sic.standby)
+               require.Nil(t, sic.standby)
                assert.Equal(t, nextTime, sic.hot.startTime)
        })
 }
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index a687a3e2..ea147d88 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -19,9 +19,9 @@ package metadata
 
 import (
        "context"
-       "errors"
        "time"
 
+       "github.com/pkg/errors"
        "go.uber.org/multierr"
        "google.golang.org/protobuf/types/known/timestamppb"
 
@@ -129,7 +129,9 @@ func (s *clientService) PreRun(ctx context.Context) error {
                ctxRegister, cancel := context.WithTimeout(ctx, time.Second*10)
                err = s.schemaRegistry.RegisterNode(ctxRegister, nodeInfo, 
s.forceRegisterNode)
                cancel()
-               if errors.Is(err, context.DeadlineExceeded) {
+               if errors.Is(err, schema.ErrGRPCAlreadyExists) {
+                       return errors.Wrapf(err, "node[%s] already exists in 
etcd", node.NodeID)
+               } else if errors.Is(err, context.DeadlineExceeded) {
                        l.Warn().Strs("etcd-endpoints", 
s.endpoints).Msg("register node timeout, retrying...")
                        continue
                }
@@ -147,7 +149,9 @@ func (s *clientService) Serve() run.StopNotify {
 func (s *clientService) GracefulStop() {
        s.closer.Done()
        s.closer.CloseThenWait()
-       _ = s.schemaRegistry.Close()
+       if err := s.schemaRegistry.Close(); err != nil {
+               logger.GetLogger(s.Name()).Error().Err(err).Msg("failed to 
close schema registry")
+       }
 }
 
 func (s *clientService) RegisterHandler(name string, kind schema.Kind, handler 
schema.EventHandler) {
diff --git a/banyand/metadata/schema/error.go b/banyand/metadata/schema/error.go
index a5301de8..9fe56dbe 100644
--- a/banyand/metadata/schema/error.go
+++ b/banyand/metadata/schema/error.go
@@ -29,11 +29,12 @@ var (
        ErrGRPCResourceNotFound = statusGRPCResourceNotFound.Err()
        // ErrClosed indicates the registry is closed.
        ErrClosed = errors.New("metadata registry is closed")
+       // ErrGRPCAlreadyExists indicates the resource already exists.
+       ErrGRPCAlreadyExists = statusGRPCAlreadyExists.Err()
 
        statusGRPCInvalidArgument  = status.New(codes.InvalidArgument, 
"banyandb: input is invalid")
        statusGRPCResourceNotFound = status.New(codes.NotFound, "banyandb: 
resource not found")
        statusGRPCAlreadyExists    = status.New(codes.AlreadyExists, "banyandb: 
resource already exists")
-       errGRPCAlreadyExists       = statusGRPCAlreadyExists.Err()
        statusDataLoss             = status.New(codes.DataLoss, "banyandb: 
resource corrupts.")
        errGRPCDataLoss            = statusDataLoss.Err()
 )
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 7de1f39c..3e33bf37 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -26,9 +26,12 @@ import (
        "time"
 
        "github.com/pkg/errors"
+       pb "go.etcd.io/etcd/api/v3/etcdserverpb"
        "go.etcd.io/etcd/client/pkg/v3/transport"
        clientv3 "go.etcd.io/etcd/client/v3"
        "go.uber.org/zap"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
        "google.golang.org/protobuf/proto"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -159,9 +162,8 @@ func (e *etcdSchemaRegistry) RegisterHandler(name string, 
kind Kind, handler Eve
 func (e *etcdSchemaRegistry) Close() error {
        e.closer.Done()
        e.closer.CloseThenWait()
-       for i, w := range e.watchers {
-               _ = w.Close()
-               e.watchers[i] = nil
+       for i := range e.watchers {
+               e.watchers[i].Close()
        }
        return e.client.Close()
 }
@@ -323,10 +325,14 @@ func (e *etcdSchemaRegistry) create(ctx context.Context, 
metadata Metadata) (int
        }
        replace := getResp.Count > 0
        if replace {
-               return 0, errGRPCAlreadyExists
+               return 0, ErrGRPCAlreadyExists
        }
        putResp, err := e.client.Put(ctx, key, string(val))
        if err != nil {
+               s, ok := status.FromError(err)
+               if ok && s.Code() == codes.AlreadyExists {
+                       return 0, ErrGRPCAlreadyExists
+               }
                return 0, err
        }
 
@@ -407,8 +413,10 @@ func (e *etcdSchemaRegistry) register(ctx context.Context, 
metadata Metadata, fo
                if errCommit != nil {
                        return errCommit
                }
+
                if !response.Succeeded {
-                       return errGRPCAlreadyExists
+                       tr := pb.TxnResponse(*response)
+                       return errors.Wrapf(ErrGRPCAlreadyExists, "response: 
%s", tr.String())
                }
        }
 
@@ -423,7 +431,13 @@ func (e *etcdSchemaRegistry) register(ctx context.Context, 
metadata Metadata, fo
                        return
                }
                defer func() {
-                       _, _ = e.client.Lease.Revoke(context.Background(), 
lease.ID)
+                       e.l.Info().Msgf("revoking lease %d", lease.ID)
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
+                       _, err = e.client.Lease.Revoke(ctx, lease.ID)
+                       cancel()
+                       if err != nil {
+                               e.l.Error().Err(err).Msgf("failed to revoke 
lease %d", lease.ID)
+                       }
                        e.closer.Done()
                }()
                for {
diff --git a/banyand/metadata/schema/register_test.go 
b/banyand/metadata/schema/register_test.go
index 45e096d9..e740e7f4 100644
--- a/banyand/metadata/schema/register_test.go
+++ b/banyand/metadata/schema/register_test.go
@@ -87,6 +87,6 @@ var _ = ginkgo.Describe("etcd_register", func() {
 
        ginkgo.It("should register only once", func() {
                gomega.Expect(r.register(context.Background(), md, 
false)).ShouldNot(gomega.HaveOccurred())
-               gomega.Expect(r.register(context.Background(), md, 
false)).Should(gomega.MatchError(errGRPCAlreadyExists))
+               gomega.Expect(r.register(context.Background(), md, 
false)).Should(gomega.MatchError(ErrGRPCAlreadyExists))
        })
 })
diff --git a/banyand/metadata/schema/watcher.go 
b/banyand/metadata/schema/watcher.go
index aa203485..b95000d2 100644
--- a/banyand/metadata/schema/watcher.go
+++ b/banyand/metadata/schema/watcher.go
@@ -68,10 +68,9 @@ func newWatcher(cli *clientv3.Client, wc watcherConfig, l 
*logger.Logger) *watch
        return w
 }
 
-func (w *watcher) Close() error {
+func (w *watcher) Close() {
        w.closer.Done()
        w.closer.CloseThenWait()
-       return nil
 }
 
 func (w *watcher) allEvents() int64 {
@@ -106,6 +105,7 @@ func (w *watcher) watch(revision int64) {
        }
        defer w.closer.Done()
        cli := w.cli
+OUTER:
        for {
                if revision < 0 {
                        revision = w.allEvents()
@@ -124,25 +124,39 @@ func (w *watcher) watch(revision int64) {
                if wch == nil {
                        continue
                }
-               for watchResp := range wch {
-                       if err := watchResp.Err(); err != nil {
-                               select {
-                               case <-w.closer.CloseNotify():
-                                       return
-                               default:
-                                       if errors.Is(err, v3rpc.ErrCompacted) {
-                                               revision = -1
-                                               break
+               for {
+                       select {
+                       case <-w.closer.CloseNotify():
+                               w.l.Warn().Msgf("watcher closed")
+                               return
+                       case watchResp, ok := <-wch:
+                               if !ok {
+                                       select {
+                                       case <-w.closer.CloseNotify():
+                                               return
+                                       default:
+                                               break OUTER
                                        }
-                                       continue
                                }
-                       }
-                       for _, event := range watchResp.Events {
-                               select {
-                               case <-w.closer.CloseNotify():
-                                       return
-                               default:
-                                       w.handle(event)
+                               if err := watchResp.Err(); err != nil {
+                                       select {
+                                       case <-w.closer.CloseNotify():
+                                               return
+                                       default:
+                                               if errors.Is(err, 
v3rpc.ErrCompacted) {
+                                                       revision = -1
+                                                       break
+                                               }
+                                               continue
+                                       }
+                               }
+                               for _, event := range watchResp.Events {
+                                       select {
+                                       case <-w.closer.CloseNotify():
+                                               return
+                                       default:
+                                               w.handle(event)
+                                       }
                                }
                        }
                }
diff --git a/banyand/metadata/schema/watcher_test.go 
b/banyand/metadata/schema/watcher_test.go
index cc5b2f45..64ed142c 100644
--- a/banyand/metadata/schema/watcher_test.go
+++ b/banyand/metadata/schema/watcher_test.go
@@ -133,7 +133,7 @@ var _ = ginkgo.Describe("Watcher", func() {
                // Start the watcher
                watcher = registry.newWatcher("test", KindMeasure, mockedObj)
                ginkgo.DeferCleanup(func() {
-                       
gomega.Expect(watcher.Close()).ShouldNot(gomega.HaveOccurred())
+                       watcher.Close()
                })
                gomega.Eventually(func() bool {
                        _, ok := mockedObj.Data()["testkey1"]
@@ -150,7 +150,7 @@ var _ = ginkgo.Describe("Watcher", func() {
        ginkgo.It("should handle watch events", func() {
                watcher = registry.newWatcher("test", KindStream, mockedObj)
                ginkgo.DeferCleanup(func() {
-                       
gomega.Expect(watcher.Close()).ShouldNot(gomega.HaveOccurred())
+                       watcher.Close()
                })
                err := registry.CreateGroup(context.Background(), 
&commonv1.Group{
                        Metadata: &commonv1.Metadata{
diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go
index aa9dbbfe..72014607 100644
--- a/banyand/queue/pub/client.go
+++ b/banyand/queue/pub/client.go
@@ -18,6 +18,8 @@
 package pub
 
 import (
+       "fmt"
+
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials/insecure"
 
@@ -26,18 +28,23 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
 )
 
-var retryPolicy = `{
+var (
+       serviceName = clusterv1.Service_ServiceDesc.ServiceName
+
+       // The timeout is set by each RPC.
+       retryPolicy = fmt.Sprintf(`{
        "methodConfig": [{
-         "name": [{"service": "grpc.examples.echo.Echo"}],
+         "name": [{"service": "%s"}],
          "waitForReady": true,
          "retryPolicy": {
-                 "MaxAttempts": 4,
-                 "InitialBackoff": ".5s",
-                 "MaxBackoff": "10s",
-                 "BackoffMultiplier": 1.0,
-                 "RetryableStatusCodes": [ "UNAVAILABLE" ]
+             "MaxAttempts": 4,
+             "InitialBackoff": ".5s",
+             "MaxBackoff": "10s",
+             "BackoffMultiplier": 1.0,
+             "RetryableStatusCodes": [ "UNAVAILABLE" ]
          }
-       }]}`
+       }]}`, serviceName)
+)
 
 type client struct {
        client clusterv1.ServiceClient
diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go
index 18a9b214..d2576e51 100644
--- a/banyand/queue/pub/pub.go
+++ b/banyand/queue/pub/pub.go
@@ -20,12 +20,12 @@ package pub
 
 import (
        "context"
-       "errors"
        "fmt"
        "io"
        "sync"
        "time"
 
+       "github.com/pkg/errors"
        "go.uber.org/multierr"
        "google.golang.org/protobuf/proto"
        "google.golang.org/protobuf/types/known/anypb"
@@ -81,17 +81,43 @@ func (p *pub) Broadcast(topic bus.Topic, messages 
bus.Message) ([]bus.Future, er
                names = append(names, k)
        }
        p.mu.RUnlock()
-       var futures []bus.Future
+       futureCh := make(chan publishResult, len(names))
+       var wg sync.WaitGroup
        for _, n := range names {
-               f, err := p.Publish(topic, 
bus.NewMessageWithNode(messages.ID(), n, messages.Data()))
-               if err != nil {
-                       return nil, err
+               wg.Add(1)
+               // Send a value into sem. If sem is full, this will block until 
there's room.
+               go func(n string) {
+                       defer wg.Done()
+                       f, err := p.Publish(topic, 
bus.NewMessageWithNode(messages.ID(), n, messages.Data()))
+                       futureCh <- publishResult{n: n, f: f, e: err}
+               }(n)
+       }
+       go func() {
+               wg.Wait()
+               close(futureCh)
+       }()
+       var futures []bus.Future
+       var errs error
+       for f := range futureCh {
+               if f.e != nil {
+                       errs = multierr.Append(errs, errors.Wrapf(f.e, "failed 
to publish message to %s", f.n))
+                       continue
                }
-               futures = append(futures, f)
+               futures = append(futures, f.f)
+       }
+
+       if errs != nil {
+               return futures, fmt.Errorf("broadcast errors: %w", errs)
        }
        return futures, nil
 }
 
+type publishResult struct {
+       f bus.Future
+       e error
+       n string
+}
+
 func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, 
error) {
        var err error
        f := &future{}
@@ -101,13 +127,15 @@ func (p *pub) Publish(topic bus.Topic, messages 
...bus.Message) (bus.Future, err
                        return multierr.Append(err, fmt.Errorf("failed to 
marshal message %T: %w", m, errSend))
                }
                node := m.Node()
-               p.mu.Lock()
+               p.mu.RLock()
                client, ok := p.clients[node]
-               p.mu.Unlock()
+               p.mu.RUnlock()
                if !ok {
                        return multierr.Append(err, fmt.Errorf("failed to get 
client for node %s", node))
                }
-               stream, errCreateStream := 
client.client.Send(context.Background())
+               ctx, cancel := context.WithTimeout(context.Background(), 
10*time.Second)
+               f.cancelFn = append(f.cancelFn, cancel)
+               stream, errCreateStream := client.client.Send(ctx)
                if errCreateStream != nil {
                        return multierr.Append(err, fmt.Errorf("failed to get 
stream for node %s: %w", node, errCreateStream))
                }
@@ -150,8 +178,8 @@ func (p *pub) PreRun(context.Context) error {
 }
 
 type writeStream struct {
-       client clusterv1.Service_SendClient
-       cancel func()
+       client    clusterv1.Service_SendClient
+       ctxDoneCh <-chan struct{}
 }
 
 type batchPublisher struct {
@@ -160,9 +188,11 @@ type batchPublisher struct {
 }
 
 func (bp *batchPublisher) Close() (err error) {
-       for _, ws := range bp.streams {
-               err = multierr.Append(err, ws.client.CloseSend())
-               ws.cancel()
+       for i := range bp.streams {
+               err = multierr.Append(err, bp.streams[i].client.CloseSend())
+       }
+       for i := range bp.streams {
+               <-bp.streams[i].ctxDoneCh
        }
        return err
 }
@@ -180,7 +210,6 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages 
...bus.Message) (bus
                                defer func() {
                                        if !success {
                                                delete(bp.streams, node)
-                                               stream.cancel()
                                        }
                                }()
                                select {
@@ -193,8 +222,7 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages 
...bus.Message) (bus
                                        err = multierr.Append(err, 
fmt.Errorf("failed to send message to node %s: %w", node, errSend))
                                        return false
                                }
-                               _, errRecv := stream.client.Recv()
-                               return errRecv == nil
+                               return errSend == nil
                        }
                        return false
                }
@@ -202,27 +230,40 @@ func (bp *batchPublisher) Publish(topic bus.Topic, 
messages ...bus.Message) (bus
                        continue
                }
 
-               bp.pub.mu.Lock()
+               bp.pub.mu.RLock()
                client, ok := bp.pub.clients[node]
-               bp.pub.mu.Unlock()
+               bp.pub.mu.RUnlock()
                if !ok {
                        err = multierr.Append(err, fmt.Errorf("failed to get 
client for node %s", node))
                        continue
                }
-               //nolint: govet
-               ctx, cancel := context.WithTimeout(context.Background(), 
30*time.Second)
+               ctx, cancel := context.WithTimeout(context.Background(), 
15*time.Second)
+               // this assignment is for getting around the go vet lint
+               deferFn := cancel
                stream, errCreateStream := client.client.Send(ctx)
                if err != nil {
                        err = multierr.Append(err, fmt.Errorf("failed to get 
stream for node %s: %w", node, errCreateStream))
                        continue
                }
                bp.streams[node] = writeStream{
-                       client: stream,
-                       cancel: cancel,
+                       client:    stream,
+                       ctxDoneCh: ctx.Done(),
                }
                _ = sendData()
+               go func(s clusterv1.Service_SendClient, deferFn func()) {
+                       defer deferFn()
+                       for {
+                               _, errRecv := s.Recv()
+                               if errRecv == nil {
+                                       continue
+                               }
+                               if errors.Is(errRecv, io.EOF) {
+                                       return
+                               }
+                               bp.pub.log.Err(errRecv).Msg("failed to receive 
message")
+                       }
+               }(stream, deferFn)
        }
-       //nolint: govet
        return nil, nil
 }
 
@@ -245,8 +286,9 @@ func messageToRequest(topic bus.Topic, m bus.Message) 
(*clusterv1.SendRequest, e
 }
 
 type future struct {
-       clients []clusterv1.Service_SendClient
-       topics  []bus.Topic
+       clients  []clusterv1.Service_SendClient
+       cancelFn []func()
+       topics   []bus.Topic
 }
 
 func (l *future) Get() (bus.Message, error) {
@@ -258,6 +300,8 @@ func (l *future) Get() (bus.Message, error) {
        defer func() {
                l.clients = l.clients[1:]
                l.topics = l.topics[1:]
+               l.cancelFn[0]()
+               l.cancelFn = l.cancelFn[1:]
        }()
        resp, err := c.Recv()
        if err != nil {
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go 
b/pkg/query/logical/stream/stream_plan_distributed.go
index 4e111ad8..6f524737 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -143,10 +143,6 @@ func (t *distributedPlan) Execute(ctx context.Context) 
([]*streamv1.Element, err
                                continue
                        }
                        resp := d.(*streamv1.QueryResponse)
-                       if err != nil {
-                               allErr = multierr.Append(allErr, err)
-                               continue
-                       }
                        see = append(see,
                                newSortableElements(resp.Elements, 
t.sortByTime, t.sortTagSpec))
                }
diff --git a/pkg/run/run.go b/pkg/run/run.go
index 4da2c23c..63531a88 100644
--- a/pkg/run/run.go
+++ b/pkg/run/run.go
@@ -25,6 +25,7 @@ import (
        "path"
        "sort"
        "sync"
+       "time"
 
        "github.com/oklog/run"
        "github.com/pkg/errors"
@@ -393,10 +394,11 @@ func (g *Group) Run(ctx context.Context) (err error) {
                if g.p[idx] == nil {
                        continue
                }
-               g.log.Debug().Uint32("ran", uint32(idx+1)).Uint32("total", 
uint32(len(g.p))).Str("name", g.p[idx].Name()).Msg("pre-run")
+               startTime := time.Now()
                if err := g.p[idx].PreRun(context.WithValue(ctx, 
common.ContextNodeRolesKey, rr)); err != nil {
                        return errors.WithMessage(err, fmt.Sprintf("pre-run 
module[%s]", g.p[idx].Name()))
                }
+               g.log.Info().Dur("elapsed", time.Since(startTime)).Str("name", 
g.p[idx].Name()).Msg("pre-run completed")
        }
 
        swg := &sync.WaitGroup{}
@@ -420,8 +422,10 @@ func (g *Group) Run(ctx context.Context) (err error) {
                        <-notify
                        return nil
                }, func(_ error) {
-                       g.log.Debug().Uint32("total", 
uint32(len(g.s))).Uint32("ran", uint32(idx+1)).Str("name", s.Name()).Msg("stop")
+                       g.log.Debug().Uint32("total", 
uint32(len(g.s))).Uint32("ran", uint32(idx+1)).Str("name", 
s.Name()).Msg("stopping")
+                       startTime := time.Now()
                        s.GracefulStop()
+                       g.log.Info().Dur("elapsed", 
time.Since(startTime)).Str("name", s.Name()).Msg("stopped")
                })
        }
 
diff --git a/pkg/schema/init.go b/pkg/schema/init.go
index 4813be53..dbc0a5af 100644
--- a/pkg/schema/init.go
+++ b/pkg/schema/init.go
@@ -304,7 +304,13 @@ func createOrUpdateTopNMeasure(ctx context.Context, 
measureSchemaRegistry schema
        }
        if oldTopNSchema == nil {
                if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, 
newTopNMeasure); innerErr != nil {
-                       return nil, innerErr
+                       if !errors.Is(innerErr, schema.ErrGRPCAlreadyExists) {
+                               return nil, innerErr
+                       }
+                       newTopNMeasure, err = 
measureSchemaRegistry.GetMeasure(ctx, topNSchema.GetMetadata())
+                       if err != nil {
+                               return nil, err
+                       }
                }
                return newTopNMeasure, nil
        }
diff --git a/test/stress/trace/Makefile b/test/stress/trace/Makefile
index 27c9cbe6..7c1d9ef4 100644
--- a/test/stress/trace/Makefile
+++ b/test/stress/trace/Makefile
@@ -41,13 +41,11 @@ QPS ?= 10
 clean:
        rm -rf /tmp/banyandb-stress-trace
 
-.PHONY: up
-up: clean
-       $(cli_env) docker compose $(CLI_ARGS) --env-file ./env up --build
+up-%: clean
+       $(cli_env) docker compose -f docker-compose-$*.yaml $(CLI_ARGS) 
--env-file ./env up --build
 
-.PHONY: down
-down:
-       docker compose down
+down-%:
+       docker compose -f docker-compose-$*.yaml down
 
 .PHONY: test_traffic
 test_traffic:
diff --git a/test/stress/trace/docker-compose.yaml 
b/test/stress/trace/docker-compose-cluster.yaml
similarity index 55%
copy from test/stress/trace/docker-compose.yaml
copy to test/stress/trace/docker-compose-cluster.yaml
index 05cfff66..8ca7e293 100644
--- a/test/stress/trace/docker-compose.yaml
+++ b/test/stress/trace/docker-compose-cluster.yaml
@@ -13,41 +13,45 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+version: '2.1'
+
 services:
-  change-vol-ownership:
-    image: ubuntu
-    user: "root"
-    group_add:
-      - '${GROUP_ID}'
-    volumes:
-      - /tmp/banyandb-stress-trace:/tmp/change-ownership
-    command: chown -R ${USER_ID}:${GROUP_ID} /tmp/change-ownership
+  etcd:
+    extends:
+      file: ../../docker/base-compose.yml
+      service: etcd
+    networks:
+      - cluster-test
 
-  banyandb:
-    user: "${USER_ID}:${GROUP_ID}"
+  data:
     extends:
       file: ../../docker/base-compose.yml
-      service: banyandb
+      service: data
     build:
       dockerfile: ./docker/Dockerfile
       context: ../../..
-    volumes:
-    - /tmp/banyandb-stress-trace:/tmp:rw,delgated
-    ports:
-    - 17913:17913
-    - 6060:6060
-    - 2121:2121
     deploy:
       resources:
         limits:
           cpus: "4"
           memory: 4G
     networks:
-      - test
-      - monitoring
-    depends_on:
-      change-vol-ownership:
-        condition: service_completed_successfully
+      - cluster-test
+
+  liaison:
+    extends:
+      file: ../../docker/base-compose.yml
+      service: liaison
+    build:
+      dockerfile: ./docker/Dockerfile
+      context: ../../..
+    deploy:
+      resources:
+        limits:
+          cpus: "2"
+          memory: 2G
+    networks:
+      - cluster-test
 
   oap:
     extends:
@@ -57,38 +61,19 @@ services:
     image: "docker.io/hanahmily/data-generator:${SW_OAP_COMMIT}"
     environment:
       SW_STORAGE: banyandb
+      SW_STORAGE_BANYANDB_TARGETS: "liaison:17912"
     ports:
       - 12800:12800
     volumes:
       - ./log4j2.xml:/skywalking/config/log4j2.xml
     networks:
-      - test
+      - cluster-test
     depends_on:
-      banyandb:
+      liaison:
         condition: service_healthy
 
-  prometheus:
-    image: prom/prometheus:latest
-    container_name: prometheus
-    restart: unless-stopped
-    profiles:
-      - "monitoring"
-    volumes:
-      - ./prometheus.yml:/etc/prometheus/prometheus.yml
-      - prometheus_data:/prometheus
-    command:
-      - '--config.file=/etc/prometheus/prometheus.yml'
-      - '--storage.tsdb.path=/prometheus'
-      - '--web.console.libraries=/etc/prometheus/console_libraries'
-      - '--web.console.templates=/etc/prometheus/consoles'
-      - '--web.enable-lifecycle'
-    ports:
-      - 9090:9090
-    networks:
-      - monitoring
 networks:
-  test:
-  monitoring:
+  cluster-test:
 
 volumes:
-  prometheus_data: {}
+  sw_agent:
diff --git a/test/stress/trace/docker-compose.yaml 
b/test/stress/trace/docker-compose-single.yaml
similarity index 100%
rename from test/stress/trace/docker-compose.yaml
rename to test/stress/trace/docker-compose-single.yaml
diff --git a/test/stress/trace/trace_suite_test.go 
b/test/stress/trace/trace_suite_test.go
index 75d7ddf8..bdac956e 100644
--- a/test/stress/trace/trace_suite_test.go
+++ b/test/stress/trace/trace_suite_test.go
@@ -19,6 +19,7 @@ package trace_test
 
 import (
        "flag"
+       "net/http"
        "path"
        "runtime"
        "testing"
@@ -31,9 +32,8 @@ import (
 )
 
 func TestIntegrationLoad(t *testing.T) {
-       t.Skip("Skip the stress test")
        RegisterFailHandler(Fail)
-       RunSpecs(t, "Stress Trace Suite")
+       RunSpecs(t, "Stress Trace Suite", Label("slow"))
 }
 
 var _ = Describe("Query", func() {
@@ -44,6 +44,12 @@ var _ = Describe("Query", func() {
        )
 
        BeforeEach(func() {
+               // Check if the URL is reachable
+               resp, err := http.Get("http://localhost:12800/graphql";)
+               if err != nil || resp.StatusCode != 200 {
+                       // If the request fails or the status code is not 200, 
skip the test
+                       Skip("http://localhost:12800/graphql is not reachable")
+               }
                fs = flag.NewFlagSet("", flag.PanicOnError)
                fs.String("base-url", "http://localhost:12800/graphql";, "")
                fs.String("service-id", "c2VydmljZV8x.1", "")

Reply via email to