This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch test in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c07ce008029af9b720d471171f38d3a74415503e Author: Gao Hongtao <[email protected]> AuthorDate: Mon Sep 18 09:00:03 2023 +0000 Use CMD to setup testing service Signed-off-by: Gao Hongtao <[email protected]> --- banyand/measure/tstable.go | 5 +- banyand/metadata/client.go | 8 +- banyand/stream/tstable.go | 6 +- bydbctl/internal/cmd/group_test.go | 6 +- bydbctl/internal/cmd/index_rule_binding_test.go | 3 +- bydbctl/internal/cmd/index_rule_test.go | 3 +- bydbctl/internal/cmd/measure_test.go | 6 +- bydbctl/internal/cmd/property_test.go | 3 +- bydbctl/internal/cmd/stream_test.go | 6 +- pkg/run/run.go | 3 + pkg/test/setup/setup.go | 140 +++++++++++---------- test/integration/load/load_suite_test.go | 2 +- .../standalone/cold_query/query_suite_test.go | 5 +- test/integration/standalone/other/measure_test.go | 4 +- test/integration/standalone/other/property_test.go | 7 +- test/integration/standalone/other/tls_test.go | 3 +- .../standalone/query/query_suite_test.go | 4 +- test/stress/cases/istio/istio_suite_test.go | 2 +- test/stress/cases/istio/repo.go | 12 +- 19 files changed, 117 insertions(+), 111 deletions(-) diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index 462d5066..a52b2a78 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -60,7 +60,7 @@ type tsTable struct { path string bufferSize int64 encoderBufferSize int64 - lock sync.Mutex + lock sync.RWMutex } func (t *tsTable) SizeOnDisk() int64 { @@ -154,9 +154,12 @@ func (t *tsTable) Get(key []byte, ts time.Time) ([]byte, error) { } func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error { + t.lock.RLock() if t.encoderBuffer != nil { + defer t.lock.RUnlock() return t.writeToBuffer(key, val, ts) } + t.lock.RUnlock() if err := t.openBuffer(); err != nil { return err } diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index d1271f20..e584a123 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -33,7 +33,11 @@ import ( "github.com/apache/skywalking-banyandb/pkg/run" ) -const flagEtcdEndpointsName = "etcd-endpoints" +const ( + // DefaultNamespace is the default namespace of the metadata stored in etcd. + DefaultNamespace = "banyandb" + flagEtcdEndpointsName = "etcd-endpoints" +) // NewClient returns a new metadata client. func NewClient(_ context.Context) (Service, error) { @@ -54,7 +58,7 @@ func (s *clientService) SchemaRegistry() schema.Registry { func (s *clientService) FlagSet() *run.FlagSet { fs := run.NewFlagSet("metadata") - fs.StringVar(&s.namespace, "namespace", "banyandb", "The namespace of the metadata stored in etcd") + fs.StringVar(&s.namespace, "namespace", DefaultNamespace, "The namespace of the metadata stored in etcd") fs.StringArrayVar(&s.endpoints, flagEtcdEndpointsName, []string{"http://localhost:2379"}, "A comma-delimited list of etcd endpoints") return fs } diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go index 6af8bf2f..6dca480c 100644 --- a/banyand/stream/tstable.go +++ b/banyand/stream/tstable.go @@ -50,7 +50,7 @@ type tsTable struct { closeBufferTimer *time.Timer position common.Position bufferSize int64 - lock sync.Mutex + lock sync.RWMutex } func (t *tsTable) SizeOnDisk() int64 { @@ -112,10 +112,12 @@ func (t *tsTable) Get(key []byte, ts time.Time) ([]byte, error) { } func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error { + t.lock.RLock() if t.buffer != nil { + defer t.lock.RUnlock() return t.buffer.Write(key, val, ts) } - + t.lock.RUnlock() if err := t.openBuffer(); err != nil { return err } diff --git a/bydbctl/internal/cmd/group_test.go b/bydbctl/internal/cmd/group_test.go index 6c18a8fd..c600a165 100644 --- a/bydbctl/internal/cmd/group_test.go +++ b/bydbctl/internal/cmd/group_test.go @@ -27,7 +27,6 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/bydbctl/internal/cmd" - "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" ) @@ -37,8 +36,7 @@ var _ = Describe("Group", func() { var deferFunc func() var rootCmd *cobra.Command BeforeEach(func() { - _, addr, deferFunc = setup.Common() - Eventually(helpers.HTTPHealthCheck(addr), flags.EventuallyTimeout).Should(Succeed()) + _, addr, deferFunc = setup.EmptyStandalone() addr = "http://" + addr // extracting the operation of creating group rootCmd = &cobra.Command{Use: "root"} @@ -141,7 +139,7 @@ resource_opts: }) resp := new(databasev1.GroupRegistryServiceListResponse) helpers.UnmarshalYAML([]byte(out), resp) - Expect(resp.Group).To(HaveLen(4)) + Expect(resp.Group).To(HaveLen(2)) }) AfterEach(func() { diff --git a/bydbctl/internal/cmd/index_rule_binding_test.go b/bydbctl/internal/cmd/index_rule_binding_test.go index 1b8e4509..8d681475 100644 --- a/bydbctl/internal/cmd/index_rule_binding_test.go +++ b/bydbctl/internal/cmd/index_rule_binding_test.go @@ -37,8 +37,7 @@ var _ = Describe("IndexRuleBindingSchema Operation", func() { var deferFunc func() var rootCmd *cobra.Command BeforeEach(func() { - _, addr, deferFunc = setup.Common() - Eventually(helpers.HTTPHealthCheck(addr), flags.EventuallyTimeout).Should(Succeed()) + _, addr, deferFunc = setup.EmptyStandalone() addr = "http://" + addr // extracting the operation of creating indexRuleBinding schema rootCmd = &cobra.Command{Use: "root"} diff --git a/bydbctl/internal/cmd/index_rule_test.go b/bydbctl/internal/cmd/index_rule_test.go index 166f46fd..2aae1aef 100644 --- a/bydbctl/internal/cmd/index_rule_test.go +++ b/bydbctl/internal/cmd/index_rule_test.go @@ -37,8 +37,7 @@ var _ = Describe("IndexRuleSchema Operation", func() { var deferFunc func() var rootCmd *cobra.Command BeforeEach(func() { - _, addr, deferFunc = setup.Common() - Eventually(helpers.HTTPHealthCheck(addr), flags.EventuallyTimeout).Should(Succeed()) + _, addr, deferFunc = setup.EmptyStandalone() addr = "http://" + addr // extracting the operation of creating indexRule schema rootCmd = &cobra.Command{Use: "root"} diff --git a/bydbctl/internal/cmd/measure_test.go b/bydbctl/internal/cmd/measure_test.go index d7fa44c9..8df93f2b 100644 --- a/bydbctl/internal/cmd/measure_test.go +++ b/bydbctl/internal/cmd/measure_test.go @@ -44,8 +44,7 @@ var _ = Describe("Measure Schema Operation", func() { var deferFunc func() var rootCmd *cobra.Command BeforeEach(func() { - _, addr, deferFunc = setup.Common() - Eventually(helpers.HTTPHealthCheck(addr), flags.EventuallyTimeout).Should(Succeed()) + _, addr, deferFunc = setup.EmptyStandalone() addr = "http://" + addr // extracting the operation of creating measure schema rootCmd = &cobra.Command{Use: "root"} @@ -201,8 +200,7 @@ var _ = Describe("Measure Data Query", func() { startStr = now.Add(-20 * time.Minute).Format(time.RFC3339) interval = 1 * time.Millisecond endStr = now.Add(5 * time.Minute).Format(time.RFC3339) - grpcAddr, addr, deferFunc = setup.Common() - Eventually(helpers.HTTPHealthCheck(addr), flags.EventuallyTimeout).Should(Succeed()) + grpcAddr, addr, deferFunc = setup.Standalone() addr = "http://" + addr rootCmd = &cobra.Command{Use: "root"} cmd.RootCmdFlags(rootCmd) diff --git a/bydbctl/internal/cmd/property_test.go b/bydbctl/internal/cmd/property_test.go index e16a2750..ee2eb81d 100644 --- a/bydbctl/internal/cmd/property_test.go +++ b/bydbctl/internal/cmd/property_test.go @@ -100,8 +100,7 @@ ttl: 30m p2Proto := new(propertyv1.Property) helpers.UnmarshalYAML([]byte(p2YAML), p2Proto) BeforeEach(func() { - _, addr, deferFunc = setup.Common() - Eventually(helpers.HTTPHealthCheck(addr), flags.EventuallyTimeout).Should(Succeed()) + _, addr, deferFunc = setup.EmptyStandalone() addr = "http://" + addr // extracting the operation of creating property schema rootCmd = &cobra.Command{Use: "root"} diff --git a/bydbctl/internal/cmd/stream_test.go b/bydbctl/internal/cmd/stream_test.go index 36f8330f..3618b9fb 100644 --- a/bydbctl/internal/cmd/stream_test.go +++ b/bydbctl/internal/cmd/stream_test.go @@ -44,8 +44,7 @@ var _ = Describe("Stream Schema Operation", func() { var deferFunc func() var rootCmd *cobra.Command BeforeEach(func() { - _, addr, deferFunc = setup.Common() - Eventually(helpers.HTTPHealthCheck(addr), flags.EventuallyTimeout).Should(Succeed()) + _, addr, deferFunc = setup.EmptyStandalone() addr = "http://" + addr // extracting the operation of creating stream schema rootCmd = &cobra.Command{Use: "root"} @@ -202,8 +201,7 @@ var _ = Describe("Stream Data Query", func() { nowStr = now.Format(time.RFC3339) interval = 500 * time.Millisecond endStr = now.Add(1 * time.Hour).Format(time.RFC3339) - grpcAddr, addr, deferFunc = setup.Common() - Eventually(helpers.HTTPHealthCheck(addr), flags.EventuallyTimeout).Should(Succeed()) + grpcAddr, addr, deferFunc = setup.Standalone() addr = "http://" + addr rootCmd = &cobra.Command{Use: "root"} cmd.RootCmdFlags(rootCmd) diff --git a/pkg/run/run.go b/pkg/run/run.go index 8e9a14c2..cd4d4f6d 100644 --- a/pkg/run/run.go +++ b/pkg/run/run.go @@ -268,6 +268,9 @@ func (g *Group) RegisterFlags() *FlagSet { func (g *Group) RunConfig() (interrupted bool, err error) { g.log = logger.GetLogger(g.name) g.configured = true + if g.f == nil { + return false, nil + } if g.name == "" { // use the binary name if custom name has not been provided diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go index 622f897b..f5af458e 100644 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@ -22,43 +22,60 @@ import ( "context" "fmt" "sync" + "time" "github.com/onsi/gomega" + grpclib "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" - "github.com/apache/skywalking-banyandb/banyand/liaison/grpc" - "github.com/apache/skywalking-banyandb/banyand/liaison/http" - "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" - "github.com/apache/skywalking-banyandb/banyand/query" - "github.com/apache/skywalking-banyandb/banyand/queue" - "github.com/apache/skywalking-banyandb/banyand/stream" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/pkg/cmdsetup" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/test" + testflags "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure" test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream" ) -const host = "127.0.0.1" +const host = "localhost" -// Common wires common modules to build a testing ready runtime. -func Common(flags ...string) (string, string, func()) { - return CommonWithSchemaLoaders([]SchemaLoader{ +// Standalone wires standalone modules to build a testing ready runtime. +func Standalone(flags ...string) (string, string, func()) { + return StandaloneWithSchemaLoaders([]SchemaLoader{ &preloadService{name: "stream"}, &preloadService{name: "measure"}, - }, flags...) + }, "", "", flags...) } -// CommonWithSchemaLoaders wires common modules to build a testing ready runtime. It also allows to preload schema. -func CommonWithSchemaLoaders(schemaLoaders []SchemaLoader, flags ...string) (string, string, func()) { - path, _, err := test.NewSpace() +// StandaloneWithTLS wires standalone modules to build a testing ready runtime with TLS enabled. +func StandaloneWithTLS(certFile, keyFile string, flags ...string) (string, string, func()) { + return StandaloneWithSchemaLoaders([]SchemaLoader{ + &preloadService{name: "stream"}, + &preloadService{name: "measure"}, + }, certFile, keyFile, flags...) +} + +// EmptyStandalone wires standalone modules to build a testing ready runtime. +func EmptyStandalone(flags ...string) (string, string, func()) { + return StandaloneWithSchemaLoaders(nil, "", "", flags...) +} + +// StandaloneWithSchemaLoaders wires standalone modules to build a testing ready runtime. It also allows to preload schema. +func StandaloneWithSchemaLoaders(schemaLoaders []SchemaLoader, certFile, keyFile string, flags ...string) (string, string, func()) { + path, deferFn, err := test.NewSpace() gomega.Expect(err).NotTo(gomega.HaveOccurred()) var ports []int ports, err = test.AllocateFreePorts(4) gomega.Expect(err).NotTo(gomega.HaveOccurred()) addr := fmt.Sprintf("%s:%d", host, ports[0]) httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) + endpoint := fmt.Sprintf("http://%s:%d", host, ports[2]) ff := []string{ + "--logging-env=dev", + "--logging-level=error", "--grpc-host=" + host, fmt.Sprintf("--grpc-port=%d", ports[0]), "--http-host=" + host, @@ -67,66 +84,63 @@ func CommonWithSchemaLoaders(schemaLoaders []SchemaLoader, flags ...string) (str "--stream-root-path=" + path, "--measure-root-path=" + path, "--metadata-root-path=" + path, - fmt.Sprintf("--etcd-listen-client-url=http://%s:%d", host, ports[2]), fmt.Sprintf("--etcd-listen-peer-url=http://%s:%d", host, ports[3]), + fmt.Sprintf("--etcd-listen-client-url=%s", endpoint), fmt.Sprintf("--etcd-listen-peer-url=http://%s:%d", host, ports[3]), + } + tlsEnabled := false + if certFile != "" && keyFile != "" { + ff = append(ff, "--tls=true", "--cert-file="+certFile, "--key-file="+keyFile, "--http-grpc-cert-file="+certFile) + tlsEnabled = true } if len(flags) > 0 { ff = append(ff, flags...) } - gracefulStop := modules(schemaLoaders, ff) - return addr, httpAddr, func() { - gracefulStop() - // deferFn() + cmdFlags := []string{"standalone"} + cmdFlags = append(cmdFlags, ff...) + closeFn := CMD(cmdFlags...) + if tlsEnabled { + creds, err := credentials.NewClientTLSFromFile(certFile, "localhost") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpclib.WithTransportCredentials(creds)), testflags.EventuallyTimeout). + Should(gomega.Succeed()) + } else { + gomega.Eventually( + helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpclib.WithTransportCredentials(insecure.NewCredentials())), + testflags.EventuallyTimeout).Should(gomega.Succeed()) } -} - -func modules(schemaLoaders []SchemaLoader, flags []string) func() { - // Init `Queue` module - pipeline := queue.Local() - // Init `Metadata` module - metaSvc, err := metadata.NewService(context.TODO()) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // Init `Stream` module - streamSvc, err := stream.NewService(context.TODO(), metaSvc, pipeline) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // Init `Measure` module - measureSvc, err := measure.NewService(context.TODO(), metaSvc, pipeline) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - // Init `Query` module - q, err := query.NewService(context.TODO(), streamSvc, measureSvc, metaSvc, pipeline) - gomega.Expect(err).NotTo(gomega.HaveOccurred()) - tcp := grpc.NewServer(context.TODO(), pipeline, metaSvc) - httpServer := http.NewServer() - - units := []run.Unit{ - pipeline, - metaSvc, + gomega.Eventually(helpers.HTTPHealthCheck(httpAddr), testflags.EventuallyTimeout).Should(gomega.Succeed()) + + if schemaLoaders != nil { + schemaRegistry, err := schema.NewEtcdSchemaRegistry( + schema.Namespace(metadata.DefaultNamespace), + schema.ConfigureServerEndpoints([]string{endpoint}), + ) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + defer schemaRegistry.Close() + var units []run.Unit + for _, sl := range schemaLoaders { + sl.SetRegistry(schemaRegistry) + units = append(units, sl) + } + preloadGroup := run.NewGroup("preload") + preloadGroup.Register(units...) + err = preloadGroup.Run(context.Background()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } - for _, sl := range schemaLoaders { - sl.SetMeta(metaSvc) - units = append(units, sl) + return addr, httpAddr, func() { + closeFn() + deferFn() } - units = append(units, - streamSvc, - measureSvc, - q, - tcp, - httpServer) - - return test.SetupModules( - flags, - units..., - ) } // SchemaLoader is a service that can preload schema. type SchemaLoader interface { run.Unit - SetMeta(meta metadata.Service) + SetRegistry(registry schema.Registry) } type preloadService struct { - metaSvc metadata.Service - name string + registry schema.Registry + name string } func (p *preloadService) Name() string { @@ -135,13 +149,13 @@ func (p *preloadService) Name() string { func (p *preloadService) PreRun(ctx context.Context) error { if p.name == "stream" { - return test_stream.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) + return test_stream.PreloadSchema(ctx, p.registry) } - return test_measure.PreloadSchema(ctx, p.metaSvc.SchemaRegistry()) + return test_measure.PreloadSchema(ctx, p.registry) } -func (p *preloadService) SetMeta(meta metadata.Service) { - p.metaSvc = meta +func (p *preloadService) SetRegistry(registry schema.Registry) { + p.registry = registry } // CMD runs the command with given flags. diff --git a/test/integration/load/load_suite_test.go b/test/integration/load/load_suite_test.go index f2dde1a0..7071bc5e 100644 --- a/test/integration/load/load_suite_test.go +++ b/test/integration/load/load_suite_test.go @@ -55,7 +55,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { Level: flags.LogLevel, })).To(Succeed()) var addr string - addr, _, deferFunc = setup.Common() + addr, _, deferFunc = setup.Standalone() Eventually( helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())), flags.EventuallyTimeout).Should(Succeed()) diff --git a/test/integration/standalone/cold_query/query_suite_test.go b/test/integration/standalone/cold_query/query_suite_test.go index e595e1a5..7c86e67e 100644 --- a/test/integration/standalone/cold_query/query_suite_test.go +++ b/test/integration/standalone/cold_query/query_suite_test.go @@ -60,10 +60,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { Level: flags.LogLevel, })).To(Succeed()) var addr string - addr, _, deferFunc = setup.Common() - Eventually( - helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())), - flags.EventuallyTimeout).Should(Succeed()) + addr, _, deferFunc = setup.Standalone() conn, err := grpchelper.Conn(addr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) Expect(err).NotTo(HaveOccurred()) ns := timestamp.NowMilli().UnixNano() diff --git a/test/integration/standalone/other/measure_test.go b/test/integration/standalone/other/measure_test.go index 24800b19..a2ab6e81 100644 --- a/test/integration/standalone/other/measure_test.go +++ b/test/integration/standalone/other/measure_test.go @@ -43,9 +43,7 @@ var _ = g.Describe("Query service_cpm_minute", func() { g.BeforeEach(func() { var addr string - addr, _, deferFn = setup.Common() - gm.Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())), - flags.EventuallyTimeout).Should(gm.Succeed()) + addr, _, deferFn = setup.Standalone() var err error conn, err = grpchelper.Conn(addr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) gm.Expect(err).NotTo(gm.HaveOccurred()) diff --git a/test/integration/standalone/other/property_test.go b/test/integration/standalone/other/property_test.go index 9b5ad68c..6ef21a4a 100644 --- a/test/integration/standalone/other/property_test.go +++ b/test/integration/standalone/other/property_test.go @@ -33,7 +33,6 @@ import ( propertyv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1" "github.com/apache/skywalking-banyandb/pkg/grpchelper" "github.com/apache/skywalking-banyandb/pkg/test/flags" - "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" ) @@ -45,9 +44,7 @@ var _ = Describe("Property application", func() { BeforeEach(func() { var addr string - addr, _, deferFn = setup.Common() - Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())), - flags.EventuallyTimeout).Should(Succeed()) + addr, _, deferFn = setup.Standalone() var err error conn, err = grpchelper.Conn(addr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) Expect(err).NotTo(HaveOccurred()) @@ -179,7 +176,7 @@ var _ = Describe("Property application", func() { BeforeEach(func() { var addr string - addr, _, deferFn = setup.Common() + addr, _, deferFn = setup.Standalone() var err error conn, err = grpchelper.Conn(addr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) Expect(err).NotTo(HaveOccurred()) diff --git a/test/integration/standalone/other/tls_test.go b/test/integration/standalone/other/tls_test.go index acda8f30..8d4d5df1 100644 --- a/test/integration/standalone/other/tls_test.go +++ b/test/integration/standalone/other/tls_test.go @@ -49,11 +49,10 @@ var _ = g.Describe("Query service_cpm_minute", func() { certFile := filepath.Join(basePath, "testdata/server_cert.pem") keyFile := filepath.Join(basePath, "testdata/server_key.pem") var addr string - addr, _, deferFn = setup.Common("--tls=true", "--cert-file="+certFile, "--key-file="+keyFile) + addr, _, deferFn = setup.StandaloneWithTLS(certFile, keyFile) var err error creds, err := credentials.NewClientTLSFromFile(certFile, "localhost") gm.Expect(err).NotTo(gm.HaveOccurred()) - gm.Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpclib.WithTransportCredentials(creds)), flags.EventuallyTimeout).Should(gm.Succeed()) conn, err = grpchelper.Conn(addr, 10*time.Second, grpclib.WithTransportCredentials(creds)) gm.Expect(err).NotTo(gm.HaveOccurred()) ns := timestamp.NowMilli().UnixNano() diff --git a/test/integration/standalone/query/query_suite_test.go b/test/integration/standalone/query/query_suite_test.go index b9852cbb..91b47189 100644 --- a/test/integration/standalone/query/query_suite_test.go +++ b/test/integration/standalone/query/query_suite_test.go @@ -60,9 +60,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { Level: flags.LogLevel, })).To(Succeed()) var addr string - addr, _, deferFunc = setup.Common() - Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())), - flags.EventuallyTimeout).Should(Succeed()) + addr, _, deferFunc = setup.Standalone() conn, err := grpchelper.Conn(addr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) Expect(err).NotTo(HaveOccurred()) ns := timestamp.NowMilli().UnixNano() diff --git a/test/stress/cases/istio/istio_suite_test.go b/test/stress/cases/istio/istio_suite_test.go index a2a2a5b6..bab730bc 100644 --- a/test/stress/cases/istio/istio_suite_test.go +++ b/test/stress/cases/istio/istio_suite_test.go @@ -57,7 +57,7 @@ var _ = Describe("Istio", func() { })).To(Succeed()) }) It("should pass", func() { - addr, _, deferFunc := setup.CommonWithSchemaLoaders([]setup.SchemaLoader{&preloadService{name: "oap"}}) + addr, _, deferFunc := setup.StandaloneWithSchemaLoaders([]setup.SchemaLoader{&preloadService{name: "oap"}}, "", "") DeferCleanup(deferFunc) Eventually(helpers.HealthCheck(addr, 10*time.Second, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())), flags.EventuallyTimeout).Should(Succeed()) diff --git a/test/stress/cases/istio/repo.go b/test/stress/cases/istio/repo.go index 6865ba60..89ac54aa 100644 --- a/test/stress/cases/istio/repo.go +++ b/test/stress/cases/istio/repo.go @@ -39,7 +39,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" - "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" ) //go:embed testdata/* @@ -131,8 +131,8 @@ func extractTarGz(src []byte, dest string) (string, error) { } type preloadService struct { - metaSvc metadata.Service - name string + registry schema.Registry + name string } func (p *preloadService) Name() string { @@ -140,7 +140,7 @@ func (p *preloadService) Name() string { } func (p *preloadService) PreRun(_ context.Context) error { - e := p.metaSvc.SchemaRegistry() + e := p.registry if err := loadSchema(groupDir, &commonv1.Group{}, func(group *commonv1.Group) error { return e.CreateGroup(context.TODO(), group) }); err != nil { @@ -165,8 +165,8 @@ func (p *preloadService) PreRun(_ context.Context) error { return nil } -func (p *preloadService) SetMeta(meta metadata.Service) { - p.metaSvc = meta +func (p *preloadService) SetRegistry(registry schema.Registry) { + p.registry = registry } const (
