This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new a85b51c0 Fix issues found by distributed test (#432) a85b51c0 is described below commit a85b51c00dc53fe2cb179b4f280a4fb4848c47f8 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Apr 12 10:38:57 2024 +0800 Fix issues found by distributed test (#432) --- banyand/internal/storage/index.go | 2 +- banyand/internal/storage/index_test.go | 6 +- banyand/metadata/client.go | 10 ++- banyand/metadata/schema/error.go | 3 +- banyand/metadata/schema/etcd.go | 26 ++++-- banyand/metadata/schema/register_test.go | 2 +- banyand/metadata/schema/watcher.go | 52 +++++++----- banyand/metadata/schema/watcher_test.go | 4 +- banyand/queue/pub/client.go | 23 ++++-- banyand/queue/pub/pub.go | 96 ++++++++++++++++------ .../logical/stream/stream_plan_distributed.go | 4 - pkg/run/run.go | 8 +- pkg/schema/init.go | 8 +- test/stress/trace/Makefile | 10 +-- ...er-compose.yaml => docker-compose-cluster.yaml} | 77 +++++++---------- ...ker-compose.yaml => docker-compose-single.yaml} | 0 test/stress/trace/trace_suite_test.go | 10 ++- 17 files changed, 210 insertions(+), 131 deletions(-) diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index 7bc91fea..af7eaed4 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -365,7 +365,7 @@ func (sic *seriesIndexController[T, O]) run(deadline time.Time) (err error) { } liveTime := sic.hot.startTime.Sub(deadline) - if liveTime > 0 && liveTime < sic.standbyLiveTime { + if liveTime > 0 && liveTime <= sic.standbyLiveTime { sic.l.Info().Time("deadline", deadline).Msg("start to create standby series index") standby, err = sic.newIdx(ctx) if err != nil { diff --git a/banyand/internal/storage/index_test.go b/banyand/internal/storage/index_test.go index a7adc370..253a64d3 100644 --- a/banyand/internal/storage/index_test.go +++ b/banyand/internal/storage/index_test.go @@ -208,8 +208,8 @@ func TestSeriesIndexController(t *testing.T) { sic, err := newSeriesIndexController(ctx, opts) require.NoError(t, err) defer sic.Close() - require.NoError(t, sic.run(time.Now().Add(-time.Hour*23))) - assert.NotNil(t, sic.standby) + require.NoError(t, sic.run(time.Now().Add(-time.Hour*23+10*time.Minute))) + require.NotNil(t, sic.standby) idxNames := make([]string, 0) walkDir(tmpDir, "idx-", func(suffix string) error { idxNames = append(idxNames, suffix) @@ -218,7 +218,7 @@ func TestSeriesIndexController(t *testing.T) { assert.Equal(t, 2, len(idxNames)) nextTime := sic.standby.startTime require.NoError(t, sic.run(time.Now().Add(time.Hour))) - assert.Nil(t, sic.standby) + require.Nil(t, sic.standby) assert.Equal(t, nextTime, sic.hot.startTime) }) } diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index a687a3e2..ea147d88 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -19,9 +19,9 @@ package metadata import ( "context" - "errors" "time" + "github.com/pkg/errors" "go.uber.org/multierr" "google.golang.org/protobuf/types/known/timestamppb" @@ -129,7 +129,9 @@ func (s *clientService) PreRun(ctx context.Context) error { ctxRegister, cancel := context.WithTimeout(ctx, time.Second*10) err = s.schemaRegistry.RegisterNode(ctxRegister, nodeInfo, s.forceRegisterNode) cancel() - if errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, schema.ErrGRPCAlreadyExists) { + return errors.Wrapf(err, "node[%s] already exists in etcd", node.NodeID) + } else if errors.Is(err, context.DeadlineExceeded) { l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("register node timeout, retrying...") continue } @@ -147,7 +149,9 @@ func (s *clientService) Serve() run.StopNotify { func (s *clientService) GracefulStop() { s.closer.Done() s.closer.CloseThenWait() - _ = s.schemaRegistry.Close() + if err := s.schemaRegistry.Close(); err != nil { + logger.GetLogger(s.Name()).Error().Err(err).Msg("failed to close schema registry") + } } func (s *clientService) RegisterHandler(name string, kind schema.Kind, handler schema.EventHandler) { diff --git a/banyand/metadata/schema/error.go b/banyand/metadata/schema/error.go index a5301de8..9fe56dbe 100644 --- a/banyand/metadata/schema/error.go +++ b/banyand/metadata/schema/error.go @@ -29,11 +29,12 @@ var ( ErrGRPCResourceNotFound = statusGRPCResourceNotFound.Err() // ErrClosed indicates the registry is closed. ErrClosed = errors.New("metadata registry is closed") + // ErrGRPCAlreadyExists indicates the resource already exists. + ErrGRPCAlreadyExists = statusGRPCAlreadyExists.Err() statusGRPCInvalidArgument = status.New(codes.InvalidArgument, "banyandb: input is invalid") statusGRPCResourceNotFound = status.New(codes.NotFound, "banyandb: resource not found") statusGRPCAlreadyExists = status.New(codes.AlreadyExists, "banyandb: resource already exists") - errGRPCAlreadyExists = statusGRPCAlreadyExists.Err() statusDataLoss = status.New(codes.DataLoss, "banyandb: resource corrupts.") errGRPCDataLoss = statusDataLoss.Err() ) diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go index 7de1f39c..3e33bf37 100644 --- a/banyand/metadata/schema/etcd.go +++ b/banyand/metadata/schema/etcd.go @@ -26,9 +26,12 @@ import ( "time" "github.com/pkg/errors" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/transport" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" @@ -159,9 +162,8 @@ func (e *etcdSchemaRegistry) RegisterHandler(name string, kind Kind, handler Eve func (e *etcdSchemaRegistry) Close() error { e.closer.Done() e.closer.CloseThenWait() - for i, w := range e.watchers { - _ = w.Close() - e.watchers[i] = nil + for i := range e.watchers { + e.watchers[i].Close() } return e.client.Close() } @@ -323,10 +325,14 @@ func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) (int } replace := getResp.Count > 0 if replace { - return 0, errGRPCAlreadyExists + return 0, ErrGRPCAlreadyExists } putResp, err := e.client.Put(ctx, key, string(val)) if err != nil { + s, ok := status.FromError(err) + if ok && s.Code() == codes.AlreadyExists { + return 0, ErrGRPCAlreadyExists + } return 0, err } @@ -407,8 +413,10 @@ func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata, fo if errCommit != nil { return errCommit } + if !response.Succeeded { - return errGRPCAlreadyExists + tr := pb.TxnResponse(*response) + return errors.Wrapf(ErrGRPCAlreadyExists, "response: %s", tr.String()) } } @@ -423,7 +431,13 @@ func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata, fo return } defer func() { - _, _ = e.client.Lease.Revoke(context.Background(), lease.ID) + e.l.Info().Msgf("revoking lease %d", lease.ID) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + _, err = e.client.Lease.Revoke(ctx, lease.ID) + cancel() + if err != nil { + e.l.Error().Err(err).Msgf("failed to revoke lease %d", lease.ID) + } e.closer.Done() }() for { diff --git a/banyand/metadata/schema/register_test.go b/banyand/metadata/schema/register_test.go index 45e096d9..e740e7f4 100644 --- a/banyand/metadata/schema/register_test.go +++ b/banyand/metadata/schema/register_test.go @@ -87,6 +87,6 @@ var _ = ginkgo.Describe("etcd_register", func() { ginkgo.It("should register only once", func() { gomega.Expect(r.register(context.Background(), md, false)).ShouldNot(gomega.HaveOccurred()) - gomega.Expect(r.register(context.Background(), md, false)).Should(gomega.MatchError(errGRPCAlreadyExists)) + gomega.Expect(r.register(context.Background(), md, false)).Should(gomega.MatchError(ErrGRPCAlreadyExists)) }) }) diff --git a/banyand/metadata/schema/watcher.go b/banyand/metadata/schema/watcher.go index aa203485..b95000d2 100644 --- a/banyand/metadata/schema/watcher.go +++ b/banyand/metadata/schema/watcher.go @@ -68,10 +68,9 @@ func newWatcher(cli *clientv3.Client, wc watcherConfig, l *logger.Logger) *watch return w } -func (w *watcher) Close() error { +func (w *watcher) Close() { w.closer.Done() w.closer.CloseThenWait() - return nil } func (w *watcher) allEvents() int64 { @@ -106,6 +105,7 @@ func (w *watcher) watch(revision int64) { } defer w.closer.Done() cli := w.cli +OUTER: for { if revision < 0 { revision = w.allEvents() @@ -124,25 +124,39 @@ func (w *watcher) watch(revision int64) { if wch == nil { continue } - for watchResp := range wch { - if err := watchResp.Err(); err != nil { - select { - case <-w.closer.CloseNotify(): - return - default: - if errors.Is(err, v3rpc.ErrCompacted) { - revision = -1 - break + for { + select { + case <-w.closer.CloseNotify(): + w.l.Warn().Msgf("watcher closed") + return + case watchResp, ok := <-wch: + if !ok { + select { + case <-w.closer.CloseNotify(): + return + default: + break OUTER } - continue } - } - for _, event := range watchResp.Events { - select { - case <-w.closer.CloseNotify(): - return - default: - w.handle(event) + if err := watchResp.Err(); err != nil { + select { + case <-w.closer.CloseNotify(): + return + default: + if errors.Is(err, v3rpc.ErrCompacted) { + revision = -1 + break + } + continue + } + } + for _, event := range watchResp.Events { + select { + case <-w.closer.CloseNotify(): + return + default: + w.handle(event) + } } } } diff --git a/banyand/metadata/schema/watcher_test.go b/banyand/metadata/schema/watcher_test.go index cc5b2f45..64ed142c 100644 --- a/banyand/metadata/schema/watcher_test.go +++ b/banyand/metadata/schema/watcher_test.go @@ -133,7 +133,7 @@ var _ = ginkgo.Describe("Watcher", func() { // Start the watcher watcher = registry.newWatcher("test", KindMeasure, mockedObj) ginkgo.DeferCleanup(func() { - gomega.Expect(watcher.Close()).ShouldNot(gomega.HaveOccurred()) + watcher.Close() }) gomega.Eventually(func() bool { _, ok := mockedObj.Data()["testkey1"] @@ -150,7 +150,7 @@ var _ = ginkgo.Describe("Watcher", func() { ginkgo.It("should handle watch events", func() { watcher = registry.newWatcher("test", KindStream, mockedObj) ginkgo.DeferCleanup(func() { - gomega.Expect(watcher.Close()).ShouldNot(gomega.HaveOccurred()) + watcher.Close() }) err := registry.CreateGroup(context.Background(), &commonv1.Group{ Metadata: &commonv1.Metadata{ diff --git a/banyand/queue/pub/client.go b/banyand/queue/pub/client.go index aa9dbbfe..72014607 100644 --- a/banyand/queue/pub/client.go +++ b/banyand/queue/pub/client.go @@ -18,6 +18,8 @@ package pub import ( + "fmt" + "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -26,18 +28,23 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata/schema" ) -var retryPolicy = `{ +var ( + serviceName = clusterv1.Service_ServiceDesc.ServiceName + + // The timeout is set by each RPC. + retryPolicy = fmt.Sprintf(`{ "methodConfig": [{ - "name": [{"service": "grpc.examples.echo.Echo"}], + "name": [{"service": "%s"}], "waitForReady": true, "retryPolicy": { - "MaxAttempts": 4, - "InitialBackoff": ".5s", - "MaxBackoff": "10s", - "BackoffMultiplier": 1.0, - "RetryableStatusCodes": [ "UNAVAILABLE" ] + "MaxAttempts": 4, + "InitialBackoff": ".5s", + "MaxBackoff": "10s", + "BackoffMultiplier": 1.0, + "RetryableStatusCodes": [ "UNAVAILABLE" ] } - }]}` + }]}`, serviceName) +) type client struct { client clusterv1.ServiceClient diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index 18a9b214..d2576e51 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -20,12 +20,12 @@ package pub import ( "context" - "errors" "fmt" "io" "sync" "time" + "github.com/pkg/errors" "go.uber.org/multierr" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -81,17 +81,43 @@ func (p *pub) Broadcast(topic bus.Topic, messages bus.Message) ([]bus.Future, er names = append(names, k) } p.mu.RUnlock() - var futures []bus.Future + futureCh := make(chan publishResult, len(names)) + var wg sync.WaitGroup for _, n := range names { - f, err := p.Publish(topic, bus.NewMessageWithNode(messages.ID(), n, messages.Data())) - if err != nil { - return nil, err + wg.Add(1) + // Send a value into sem. If sem is full, this will block until there's room. + go func(n string) { + defer wg.Done() + f, err := p.Publish(topic, bus.NewMessageWithNode(messages.ID(), n, messages.Data())) + futureCh <- publishResult{n: n, f: f, e: err} + }(n) + } + go func() { + wg.Wait() + close(futureCh) + }() + var futures []bus.Future + var errs error + for f := range futureCh { + if f.e != nil { + errs = multierr.Append(errs, errors.Wrapf(f.e, "failed to publish message to %s", f.n)) + continue } - futures = append(futures, f) + futures = append(futures, f.f) + } + + if errs != nil { + return futures, fmt.Errorf("broadcast errors: %w", errs) } return futures, nil } +type publishResult struct { + f bus.Future + e error + n string +} + func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, error) { var err error f := &future{} @@ -101,13 +127,15 @@ func (p *pub) Publish(topic bus.Topic, messages ...bus.Message) (bus.Future, err return multierr.Append(err, fmt.Errorf("failed to marshal message %T: %w", m, errSend)) } node := m.Node() - p.mu.Lock() + p.mu.RLock() client, ok := p.clients[node] - p.mu.Unlock() + p.mu.RUnlock() if !ok { return multierr.Append(err, fmt.Errorf("failed to get client for node %s", node)) } - stream, errCreateStream := client.client.Send(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + f.cancelFn = append(f.cancelFn, cancel) + stream, errCreateStream := client.client.Send(ctx) if errCreateStream != nil { return multierr.Append(err, fmt.Errorf("failed to get stream for node %s: %w", node, errCreateStream)) } @@ -150,8 +178,8 @@ func (p *pub) PreRun(context.Context) error { } type writeStream struct { - client clusterv1.Service_SendClient - cancel func() + client clusterv1.Service_SendClient + ctxDoneCh <-chan struct{} } type batchPublisher struct { @@ -160,9 +188,11 @@ type batchPublisher struct { } func (bp *batchPublisher) Close() (err error) { - for _, ws := range bp.streams { - err = multierr.Append(err, ws.client.CloseSend()) - ws.cancel() + for i := range bp.streams { + err = multierr.Append(err, bp.streams[i].client.CloseSend()) + } + for i := range bp.streams { + <-bp.streams[i].ctxDoneCh } return err } @@ -180,7 +210,6 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus defer func() { if !success { delete(bp.streams, node) - stream.cancel() } }() select { @@ -193,8 +222,7 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus err = multierr.Append(err, fmt.Errorf("failed to send message to node %s: %w", node, errSend)) return false } - _, errRecv := stream.client.Recv() - return errRecv == nil + return errSend == nil } return false } @@ -202,27 +230,40 @@ func (bp *batchPublisher) Publish(topic bus.Topic, messages ...bus.Message) (bus continue } - bp.pub.mu.Lock() + bp.pub.mu.RLock() client, ok := bp.pub.clients[node] - bp.pub.mu.Unlock() + bp.pub.mu.RUnlock() if !ok { err = multierr.Append(err, fmt.Errorf("failed to get client for node %s", node)) continue } - //nolint: govet - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + // this assignment is for getting around the go vet lint + deferFn := cancel stream, errCreateStream := client.client.Send(ctx) if err != nil { err = multierr.Append(err, fmt.Errorf("failed to get stream for node %s: %w", node, errCreateStream)) continue } bp.streams[node] = writeStream{ - client: stream, - cancel: cancel, + client: stream, + ctxDoneCh: ctx.Done(), } _ = sendData() + go func(s clusterv1.Service_SendClient, deferFn func()) { + defer deferFn() + for { + _, errRecv := s.Recv() + if errRecv == nil { + continue + } + if errors.Is(errRecv, io.EOF) { + return + } + bp.pub.log.Err(errRecv).Msg("failed to receive message") + } + }(stream, deferFn) } - //nolint: govet return nil, nil } @@ -245,8 +286,9 @@ func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, e } type future struct { - clients []clusterv1.Service_SendClient - topics []bus.Topic + clients []clusterv1.Service_SendClient + cancelFn []func() + topics []bus.Topic } func (l *future) Get() (bus.Message, error) { @@ -258,6 +300,8 @@ func (l *future) Get() (bus.Message, error) { defer func() { l.clients = l.clients[1:] l.topics = l.topics[1:] + l.cancelFn[0]() + l.cancelFn = l.cancelFn[1:] }() resp, err := c.Recv() if err != nil { diff --git a/pkg/query/logical/stream/stream_plan_distributed.go b/pkg/query/logical/stream/stream_plan_distributed.go index 4e111ad8..6f524737 100644 --- a/pkg/query/logical/stream/stream_plan_distributed.go +++ b/pkg/query/logical/stream/stream_plan_distributed.go @@ -143,10 +143,6 @@ func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, err continue } resp := d.(*streamv1.QueryResponse) - if err != nil { - allErr = multierr.Append(allErr, err) - continue - } see = append(see, newSortableElements(resp.Elements, t.sortByTime, t.sortTagSpec)) } diff --git a/pkg/run/run.go b/pkg/run/run.go index 4da2c23c..63531a88 100644 --- a/pkg/run/run.go +++ b/pkg/run/run.go @@ -25,6 +25,7 @@ import ( "path" "sort" "sync" + "time" "github.com/oklog/run" "github.com/pkg/errors" @@ -393,10 +394,11 @@ func (g *Group) Run(ctx context.Context) (err error) { if g.p[idx] == nil { continue } - g.log.Debug().Uint32("ran", uint32(idx+1)).Uint32("total", uint32(len(g.p))).Str("name", g.p[idx].Name()).Msg("pre-run") + startTime := time.Now() if err := g.p[idx].PreRun(context.WithValue(ctx, common.ContextNodeRolesKey, rr)); err != nil { return errors.WithMessage(err, fmt.Sprintf("pre-run module[%s]", g.p[idx].Name())) } + g.log.Info().Dur("elapsed", time.Since(startTime)).Str("name", g.p[idx].Name()).Msg("pre-run completed") } swg := &sync.WaitGroup{} @@ -420,8 +422,10 @@ func (g *Group) Run(ctx context.Context) (err error) { <-notify return nil }, func(_ error) { - g.log.Debug().Uint32("total", uint32(len(g.s))).Uint32("ran", uint32(idx+1)).Str("name", s.Name()).Msg("stop") + g.log.Debug().Uint32("total", uint32(len(g.s))).Uint32("ran", uint32(idx+1)).Str("name", s.Name()).Msg("stopping") + startTime := time.Now() s.GracefulStop() + g.log.Info().Dur("elapsed", time.Since(startTime)).Str("name", s.Name()).Msg("stopped") }) } diff --git a/pkg/schema/init.go b/pkg/schema/init.go index 4813be53..dbc0a5af 100644 --- a/pkg/schema/init.go +++ b/pkg/schema/init.go @@ -304,7 +304,13 @@ func createOrUpdateTopNMeasure(ctx context.Context, measureSchemaRegistry schema } if oldTopNSchema == nil { if _, innerErr := measureSchemaRegistry.CreateMeasure(ctx, newTopNMeasure); innerErr != nil { - return nil, innerErr + if !errors.Is(innerErr, schema.ErrGRPCAlreadyExists) { + return nil, innerErr + } + newTopNMeasure, err = measureSchemaRegistry.GetMeasure(ctx, topNSchema.GetMetadata()) + if err != nil { + return nil, err + } } return newTopNMeasure, nil } diff --git a/test/stress/trace/Makefile b/test/stress/trace/Makefile index 27c9cbe6..7c1d9ef4 100644 --- a/test/stress/trace/Makefile +++ b/test/stress/trace/Makefile @@ -41,13 +41,11 @@ QPS ?= 10 clean: rm -rf /tmp/banyandb-stress-trace -.PHONY: up -up: clean - $(cli_env) docker compose $(CLI_ARGS) --env-file ./env up --build +up-%: clean + $(cli_env) docker compose -f docker-compose-$*.yaml $(CLI_ARGS) --env-file ./env up --build -.PHONY: down -down: - docker compose down +down-%: + docker compose -f docker-compose-$*.yaml down .PHONY: test_traffic test_traffic: diff --git a/test/stress/trace/docker-compose.yaml b/test/stress/trace/docker-compose-cluster.yaml similarity index 55% copy from test/stress/trace/docker-compose.yaml copy to test/stress/trace/docker-compose-cluster.yaml index 05cfff66..8ca7e293 100644 --- a/test/stress/trace/docker-compose.yaml +++ b/test/stress/trace/docker-compose-cluster.yaml @@ -13,41 +13,45 @@ # See the License for the specific language governing permissions and # limitations under the License. +version: '2.1' + services: - change-vol-ownership: - image: ubuntu - user: "root" - group_add: - - '${GROUP_ID}' - volumes: - - /tmp/banyandb-stress-trace:/tmp/change-ownership - command: chown -R ${USER_ID}:${GROUP_ID} /tmp/change-ownership + etcd: + extends: + file: ../../docker/base-compose.yml + service: etcd + networks: + - cluster-test - banyandb: - user: "${USER_ID}:${GROUP_ID}" + data: extends: file: ../../docker/base-compose.yml - service: banyandb + service: data build: dockerfile: ./docker/Dockerfile context: ../../.. - volumes: - - /tmp/banyandb-stress-trace:/tmp:rw,delgated - ports: - - 17913:17913 - - 6060:6060 - - 2121:2121 deploy: resources: limits: cpus: "4" memory: 4G networks: - - test - - monitoring - depends_on: - change-vol-ownership: - condition: service_completed_successfully + - cluster-test + + liaison: + extends: + file: ../../docker/base-compose.yml + service: liaison + build: + dockerfile: ./docker/Dockerfile + context: ../../.. + deploy: + resources: + limits: + cpus: "2" + memory: 2G + networks: + - cluster-test oap: extends: @@ -57,38 +61,19 @@ services: image: "docker.io/hanahmily/data-generator:${SW_OAP_COMMIT}" environment: SW_STORAGE: banyandb + SW_STORAGE_BANYANDB_TARGETS: "liaison:17912" ports: - 12800:12800 volumes: - ./log4j2.xml:/skywalking/config/log4j2.xml networks: - - test + - cluster-test depends_on: - banyandb: + liaison: condition: service_healthy - prometheus: - image: prom/prometheus:latest - container_name: prometheus - restart: unless-stopped - profiles: - - "monitoring" - volumes: - - ./prometheus.yml:/etc/prometheus/prometheus.yml - - prometheus_data:/prometheus - command: - - '--config.file=/etc/prometheus/prometheus.yml' - - '--storage.tsdb.path=/prometheus' - - '--web.console.libraries=/etc/prometheus/console_libraries' - - '--web.console.templates=/etc/prometheus/consoles' - - '--web.enable-lifecycle' - ports: - - 9090:9090 - networks: - - monitoring networks: - test: - monitoring: + cluster-test: volumes: - prometheus_data: {} + sw_agent: diff --git a/test/stress/trace/docker-compose.yaml b/test/stress/trace/docker-compose-single.yaml similarity index 100% rename from test/stress/trace/docker-compose.yaml rename to test/stress/trace/docker-compose-single.yaml diff --git a/test/stress/trace/trace_suite_test.go b/test/stress/trace/trace_suite_test.go index 75d7ddf8..bdac956e 100644 --- a/test/stress/trace/trace_suite_test.go +++ b/test/stress/trace/trace_suite_test.go @@ -19,6 +19,7 @@ package trace_test import ( "flag" + "net/http" "path" "runtime" "testing" @@ -31,9 +32,8 @@ import ( ) func TestIntegrationLoad(t *testing.T) { - t.Skip("Skip the stress test") RegisterFailHandler(Fail) - RunSpecs(t, "Stress Trace Suite") + RunSpecs(t, "Stress Trace Suite", Label("slow")) } var _ = Describe("Query", func() { @@ -44,6 +44,12 @@ var _ = Describe("Query", func() { ) BeforeEach(func() { + // Check if the URL is reachable + resp, err := http.Get("http://localhost:12800/graphql") + if err != nil || resp.StatusCode != 200 { + // If the request fails or the status code is not 200, skip the test + Skip("http://localhost:12800/graphql is not reachable") + } fs = flag.NewFlagSet("", flag.PanicOnError) fs.String("base-url", "http://localhost:12800/graphql", "") fs.String("service-id", "c2VydmljZV8x.1", "")